I am just learning how to use std::thread
in c++11. Basically, I have a long list of data (imagine a for loop between 0-15000) and 1568 threads in the hardware I am using. I want a separate thread to process each sample. I understand how to create the first 1568 threads, it works fine. But once I reach the N_thread+1 sample, I then want to check to see if there are any available threads. If there are, send that data sample to that thread. Each thread is sent to a mutex-locked function which unlocks at the end. Perhaps I have misunderstood how threads work and cannot do things this way? Or perhaps there is a better threading/CPU assigning library that can help?
As I said, I can get to the point where the 1568 threads are created and run and join, and the end results are good. Just need a bit more information.
This is my main
int main(){
cout<<"In main"<<endl;
CSVReaderUpdatedStructure reader("data.csv");
vector<STMDataPacket> DataList = reader.GetData();
thread_pool Pool(THREAD_COUNT);
auto startT0 = chrono::high_resolution_clock::now();
for(unsigned s=0; s<DataList.size()-1; s++){
cout<<"analysing sample "<<s<<endl;
auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
done.wait();
}
auto stop = chrono::high_resolution_clock::now();
cout<<"pulses "<<pulses.size()<<endl;
auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0);
cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;
return 0;
}
You probably don't want 1568 threads. You want 1568+ tasks, maybe.
You probably want a thread pool. TBB has a thread pool, and is available on almost every platform.
Writing your own thread pool isn't that hard. Here is a sketch of one:
template<class T>
struct threadsafe_queue {
optional<T> pop() {
auto l = lock();
cv.wait( l, [&]{
return abort || !data.empty();
});
if (abort) return {};
T retval = std::move(data.front());
data.pop();
return retval;
}
void push( T in ) {
auto l = lock();
data.push( std::move(in) );
cv.notify_one();
}
void abort_queue() {
auto l = lock();
abort = true;
cv.notify_all();
}
private:
mutable std::mutex m;
std::condition_variable cv;
std::queue<T> data;
bool abort = false;
std::unique_lock<std::mutex> lock() const {
return std::unique_lock<std::mutex>(m);
}
};
struct thread_pool {
template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
auto add_task( F&& f )
-> std::future< R >
{
std::packaged_task<R()> task( std::forward<F>(f) );
auto retval = task.get_future();
tasks.push( std::packaged_task<void()>(std::move(task)) );
return retval;
}
void start_thread( std::size_t N=1 )
{
if (shutdown) return;
for (std::size_t i = 0; i < N; ++i)
{
threads.emplace_back( [this]{
while (true)
{
if(shutdown) return;
auto task = tasks.pop();
if (!task)
return;
(*task)();
}
} );
}
}
void cleanup() {
shutdown = true;
tasks.abort_queue();
for (auto&& t:threads)
t.join();
threads.clear();
}
~thread_pool() {
cleanup();
}
thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
explicit thread_pool( std::size_t N ) {
start_thread(N);
}
private:
threadsafe_queue<std::packaged_task<void()>> tasks;
std::vector<std::thread> threads;
std::atomic<bool> shutdown = false;
};
now create a thread_pool
.
Shove tasks into it. Get futures out.
Have the worker tasks increment a std::atomic<unsigned int>
and wait for it to hit max, or do something fancier.
struct counting_barrier {
explicit counting_barrier( std::size_t n ):count(n) {}
void operator--() {
--count;
if (count <= 0)
{
std::unique_lock<std::mutex> l(m);
cv.notify_all();
}
}
void wait() {
std::unique_lock<std::mutex> l(m);
cv.wait( l, [&]{ return count <= 0; } );
}
private:
std::mutex m;
std::condition_variable cv;
std::atomic<std::ptrdiff_t> count = 0;
};
Create a counting_barrier barrier( 15000 )
or whatever. Threads when done can --barrier
(it is thread safe). The main thread can barrier.wait()
and it will be woken up when 15000 --
have been called.
The above code may have typos, but the design is sound. For industrial strength use, you'll also want to have a better shutdown procedure.
If you don't have optional or boost optional, use this:
template<class T>
struct optional {
T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };
T& operator*() & { return *get(); }
T&& operator*() && { return std::move(*get()); }
T const& operator*() const & { return *get(); }
T const&& operator*() const&& { return std::move(*get()); }
explicit operator bool() const { return engaged; }
bool has_value() const { return (bool)*this; }
template< class U >
T value_or( U&& default_value ) const& {
if (*this) return **this;
return std::forward<U>(default_value);
}
template< class U >
T value_or( U&& default_value ) && {
if (*this) return std::move(**this);
return std::forward<U>(default_value);
}
optional(T const& t) {
emplace(t);
}
optional(T&& t) {
emplace(std::move(t));
}
optional() = default;
optional(optional const& o) {
if (o) {
emplace( *o );
}
}
optional(optional && o) {
if (o) {
emplace( std::move(*o) );
}
}
optional& operator=(optional const& o) & {
if (!o) {
reset();
} else if (*this) {
**this = *o;
} else {
emplace( *o );
}
return *this;
}
optional& operator=(optional && o) & {
if (!o) {
reset();
} else if (*this) {
**this = std::move(*o);
} else {
emplace( std::move(*o) );
}
return *this;
}
template<class...Args>
T& emplace(Args&&...args) {
if (*this) reset();
::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
engaged = true;
return **this;
}
void reset() {
if (*this) {
get()->~T();
engaged = false;
}
}
~optional() { reset(); }
private:
using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
bool engaged = false;
storage data;
};
note that this optional isn't industrial strength; I literally wrote it and didn't test it. It is missing many industrial strength features that a real optional has. But you can drop a real optional in its place and get pretty much the same or better behavior, so it can be used if you lack one.
counting_barrier barrier(100);
thread_pool p(10);
for (int i = 0; i < 100; ++i)
{
p.add_task([&barrier,i]{
std::stringstream ss;
ss << i << ",";
std::cout << ss.str();
--barrier;
});
}
barrier.wait();
std::cout << "\n";
auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
done1.wait();
auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
done2.wait();