Search code examples
c++mutexstdatomicconcurrent-processing

How to let different threads fill an array together?


Suppose I have some tasks (Monte Carlo simulations) that I want to run in parallel. I want to complete a given number of tasks, but tasks take different amount of time so not easy to divide the work evenly over the threads. Also: I need the results of all simulations in a single vector (or array) in the end.

So I come up with below approach:

 int Max{1000000};
 //SimResult is some struct with well-defined default value.
 std::vector<SimResult> vec(/*length*/Max);//Initialize with default values of SimResult
 int LastAdded{0};
 void fill(int RandSeed)
 { 
      Simulator sim{RandSeed};
      while(LastAdded < Max)
      {
           // Do some work to bring foo to the desired state
           //The duration of this work is subject to randomness
           vec[LastAdded++] 
                 = sim.GetResult();//Produces SimResult. 
      }
 }
 main()
 { 
       //launch a bunch of std::async that start
       auto fut1 = std::async(fill,1);
       auto fut2 = std::async(fill,2);
       //maybe some more tasks.


      fut1.get();
      fut2.get();
      //do something with the results in vec. 
 }

The above code will give race conditions I guess. I am looking for a performant approach to avoid that. Requirements: avoid race conditions (fill the entire array, no skips) ; final result is immediately in array ; performant.

Reading on various approaches, it seems atomic is a good candidate, but I am not sure what settings will be most performant in my case? And not even sure whether atomic will cut it; maybe a mutex guarding LastAdded is needed?


Solution

  • Since you already know how many elements your are going to work with and never change the size of the vector, the easiest solution is to let each thread work on it's own part of the vector. For example

    Update

    to accomodate for vastly varying calculation times, you should keep your current code, but avoid race conditions via a std::lock_guard. You will need a std::mutex that is the same for all threads, for example a global variable, or pass a reference of the mutex to each thread.

    void fill(int RandSeed, std::mutex &nextItemMutex)
    { 
          Simulator sim{RandSeed};
          size_t workingIndex;
          while(true)
          {
              {
                   // enter critical area
                   std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
    
                   // Acquire next item
                   if(LastAdded < Max)
                   {
                       workingIndex = LastAdded;
                       LastAdded++;
                   } 
                   else 
                   {
                       break;
                   }
                   // lock is released when nextItemLock goes out of scope
              }
    
               // Do some work to bring foo to the desired state
               // The duration of this work is subject to randomness
               vec[workingIndex] = sim.GetResult();//Produces SimResult. 
          }
     }
    

    Problem with this is, that snychronisation is quite expensive. But it's probably not that expensive in comparison to the simulation you run, so it shouldn't be too bad.

    Version 2:

    To reduce the amount of synchronisation that is required, you could acquire blocks to work on, instead of single items:

    void fill(int RandSeed, std::mutex &nextItemMutex, size_t blockSize)
    { 
          Simulator sim{RandSeed};
          size_t workingIndex;
          while(true)
          {
              {
                   std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
    
                   if(LastAdded < Max)
                   {
                       workingIndex = LastAdded;
                       LastAdded += blockSize;
                   } 
                   else 
                   {
                       break;
                   }
              }
              
              for(size_t i = workingIndex; i < workingIndex + blockSize && i < MAX; i++)
                  vec[i] = sim.GetResult();//Produces SimResult. 
          }
     }
    

    Simple Version

    void fill(int RandSeed, size_t partitionStart, size_t partitionEnd)
    { 
          Simulator sim{RandSeed};
          for(size_t i = partitionStart; i < partitionEnd; i++)
          {
               // Do some work to bring foo to the desired state
               // The duration of this work is subject to randomness
               vec[i] = sim.GetResult();//Produces SimResult. 
          }
     }
    
    main()
    { 
        //launch a bunch of std::async that start
        auto fut1 = std::async(fill,1, 0, Max / 2);
        auto fut2 = std::async(fill,2, Max / 2, Max);
    
        // ...
    }