Search code examples
c++multithreadingc++11stdthread

How to know when a std::thread has finished in c++?


I am just learning how to use std::thread in c++11. Basically, I have a long list of data (imagine a for loop between 0-15000) and 1568 threads in the hardware I am using. I want a separate thread to process each sample. I understand how to create the first 1568 threads, it works fine. But once I reach the N_thread+1 sample, I then want to check to see if there are any available threads. If there are, send that data sample to that thread. Each thread is sent to a mutex-locked function which unlocks at the end. Perhaps I have misunderstood how threads work and cannot do things this way? Or perhaps there is a better threading/CPU assigning library that can help?

As I said, I can get to the point where the 1568 threads are created and run and join, and the end results are good. Just need a bit more information.

This is my main

int main(){
  cout<<"In main"<<endl;
  CSVReaderUpdatedStructure reader("data.csv");
  vector<STMDataPacket> DataList = reader.GetData();

  thread_pool Pool(THREAD_COUNT);
  auto startT0 = chrono::high_resolution_clock::now();
   for(unsigned s=0; s<DataList.size()-1; s++){
      cout<<"analysing sample "<<s<<endl;
      auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
      done.wait();

    }

  auto stop = chrono::high_resolution_clock::now();
  cout<<"pulses "<<pulses.size()<<endl;
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0); 
  cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;

  return 0;

}

Solution

  • You probably don't want 1568 threads. You want 1568+ tasks, maybe.

    You probably want a thread pool. TBB has a thread pool, and is available on almost every platform.

    Writing your own thread pool isn't that hard. Here is a sketch of one:

    template<class T>
    struct threadsafe_queue {
      optional<T> pop() {
        auto l = lock();
        cv.wait( l, [&]{
          return abort || !data.empty();
        });
        if (abort) return {};
        T retval = std::move(data.front());
        data.pop();
        return retval;
      }
      void push( T in ) {
        auto l = lock();
        data.push( std::move(in) );
        cv.notify_one();
      }
      void abort_queue() {
        auto l = lock();
        abort = true;
        cv.notify_all();
      }
    private:
      mutable std::mutex m;
      std::condition_variable cv;
      std::queue<T> data;
      bool abort = false;
    
      std::unique_lock<std::mutex> lock() const {
        return std::unique_lock<std::mutex>(m);
      }
    };
    
    struct thread_pool {
      template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
      auto add_task( F&& f )
      -> std::future< R >
      {
         std::packaged_task<R()> task( std::forward<F>(f) );
         auto retval = task.get_future();
         tasks.push( std::packaged_task<void()>(std::move(task)) );
         return retval;
      }
    
      void start_thread( std::size_t N=1 )
      {
        if (shutdown) return;
        for (std::size_t i = 0; i < N; ++i)
        {
          threads.emplace_back( [this]{
            while (true)
            {
              if(shutdown) return;
              auto task = tasks.pop();
              if (!task)
                return;
              (*task)();
            }
          } );
        }
      }
      void cleanup() {
        shutdown = true;
        tasks.abort_queue();
        for (auto&& t:threads)
          t.join();
        threads.clear();
      }
      ~thread_pool() {
        cleanup();
      }
    
      thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
      explicit thread_pool( std::size_t N ) {
        start_thread(N);
      }
    private:
      threadsafe_queue<std::packaged_task<void()>> tasks;
      std::vector<std::thread> threads;
      std::atomic<bool> shutdown = false;
    };
    

    now create a thread_pool.

    Shove tasks into it. Get futures out.

    Have the worker tasks increment a std::atomic<unsigned int> and wait for it to hit max, or do something fancier.

    struct counting_barrier {
      explicit counting_barrier( std::size_t n ):count(n) {}
      void operator--() {
        --count;
        if (count <= 0)
        {
           std::unique_lock<std::mutex> l(m);
           cv.notify_all();
        }
      }
      void wait() {
        std::unique_lock<std::mutex> l(m);
        cv.wait( l, [&]{ return count <= 0; } );
      }
    private:
      std::mutex m;
      std::condition_variable cv;
      std::atomic<std::ptrdiff_t> count = 0;
    };
    

    Create a counting_barrier barrier( 15000 ) or whatever. Threads when done can --barrier (it is thread safe). The main thread can barrier.wait() and it will be woken up when 15000 -- have been called.

    The above code may have typos, but the design is sound. For industrial strength use, you'll also want to have a better shutdown procedure.

    Live example.

    If you don't have optional or boost optional, use this:

    template<class T>
    struct optional {
      T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
      T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };
    
      T& operator*() & { return *get(); }
      T&& operator*() && { return std::move(*get()); }
      T const& operator*() const & { return *get(); }
      T const&& operator*() const&& { return std::move(*get()); }
    
      explicit operator bool() const { return engaged; }
      bool has_value() const { return (bool)*this; }
      template< class U >
      T value_or( U&& default_value ) const& {
        if (*this) return **this;
        return std::forward<U>(default_value);
      }
      template< class U >
      T value_or( U&& default_value ) && {
        if (*this) return std::move(**this);
        return std::forward<U>(default_value);
      }
    
      optional(T const& t) {
        emplace(t);
      }
      optional(T&& t) {
        emplace(std::move(t));
      }
      optional() = default;
      optional(optional const& o) {
        if (o) {
          emplace( *o );
        }
      }
      optional(optional && o) {
        if (o) {
          emplace( std::move(*o) );
        }
      }
      optional& operator=(optional const& o) & {
        if (!o) {
          reset();
        } else if (*this) {
          **this = *o;
        } else {
          emplace( *o );
        }
        return *this;
      }
      optional& operator=(optional && o) & {
        if (!o) {
          reset();
        } else if (*this) {
          **this = std::move(*o);
        } else {
          emplace( std::move(*o) );
        }
        return *this;
      }
      template<class...Args>
      T& emplace(Args&&...args) {
        if (*this) reset();
        ::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
        engaged = true;
        return **this;
      }
      void reset() {
        if (*this) {
          get()->~T();
          engaged = false;
        }
      }
      ~optional() { reset(); }
    private:
      using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
      bool engaged = false;
      storage data;
    };
    

    note that this optional isn't industrial strength; I literally wrote it and didn't test it. It is missing many industrial strength features that a real optional has. But you can drop a real optional in its place and get pretty much the same or better behavior, so it can be used if you lack one.

    counting_barrier barrier(100);
    thread_pool p(10);
    for (int i = 0; i < 100; ++i)
    {
      p.add_task([&barrier,i]{
        std::stringstream ss;
        ss << i << ",";
        std::cout << ss.str();
        --barrier;
      });
    }
    barrier.wait();
    std::cout << "\n";
    auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
    done1.wait();
    auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
    done2.wait();