Search code examples
c++multithreadingperformancereal-timethread-synchronization

Consecutive parallel computation with very low workload per invocation


Lets suppose i want to compute the methods A and B in the following example in parallel:

while (true)
{
   int state = NextState();

   int a = A(state);
   int b = B(state);

   ImportantMethod(a, b);
}

NextState can not be computed ahead of time in this example. As you can see the arguments of the methods A and B are dependent on the state. The resulting values a and b are used to call the ImportantMethod which has to be called every iteration.

This requires the multithreading code to call A and B in parallel and wait for their result inside a single iteration. It is not possible to combine consecutive iterations to create a greater workload.

ImportantMethod is used in a real time application and is currently called too infrequently. The methods A and B have quite a small workload. Roughly about 10 multiplications and 2 trigonometric functions (sin, cos), just to bring that into perspective. A and B are the bottleneck though, ImportantMethod just requires the loop to be extremely fast.

I have considered using two threads for A and B and wake them up every iteration using a condition variable. But considering the small workload, i fear that the overhead of waking up the threads and waiting for their result is larger then the computation itself. I could also try using busy waiting with a boolean flag without any synchronization. That would hog the cpu really well, but that would be acceptable.

I have simplified the problem a lot and i don't think i can create a test application that would give me accurate performance metrics that would be comparable to what the real application would have. Implementing that in the real application is going to be really complicated though, so i would like to gain more knowledge before attempting that.

Therefore i would like to ask if someone has experience with these types of problems. Particularly with very frequent thread wakeups and synchronization.

Can busy waiting be favorable over notifying with a condition variable? Are there any other methods to synchronize multiple threads that i have not considered yet which might be better suitable?


Solution

  • Ok i did some reasearch now. Maybe my results are going to be interesting for someone. Keep in mind that some parts of the code below are windows specific (thread affinity and sleep to be precise).

    So it looks like you can make really small workloads faster in parallel. But it requires to basically stall some (or all) cores using busy waiting. I could not archieve any speedup otherwise. It is also really important to set thread affinity so that the threads are executed on specific cores. In my tests, not doing that resulted again in slower speed then the sequential counterpart.

    What i came up with is the following:

    1. Set main thread affinity to a specific core
    2. Spawn N worker threads and set those affinities to different cores as well
    3. Busy wait in worker threads until a job for computation arrives
    4. Use the main thread to assign jobs to worker threads and also do kind of a busy loop to check if any worker thread finished the execution

    This results in all participating cores (including the main thread core) to be maxed out at 100% for the time of the computations. The worker threads are not terminating between consecutive parallel calls to save thread start overhead.

    It should be noted that at least on windows, you have to wait some time in order for windows to properly move the threads to the requested cores. I did that using sleep. The following image shows my cpu usage during the test (60s).

    CPU usage during 60s in percent (8 logical cores)

    I have marked a few positions on the graph of the first core:

    1. Windows moved all worker threads to their specific cores. Core#0 is not occupied anymore.
    2. The task scheduler is started on Core#0 and the parallel computation has begun.
    3. The task scheduler is finished and all other cores are going back to normal as well.
    4. Sequential computation has started
    5. Sequential computation is done

    I have tested the timings with two different degrees of parallelism:


    32 tasks (essentially up to 7 in parallel, the image above is captured during this test):

    parallel: 2.7s

    sequential: 7.1s


    2 tasks (2 in parallel, other cores busy waiting (wasted)):

    parallel: 0.365s

    sequential: 0.464s


    As you can see, there is some speedup even on the 2 tasks one. It is not half the time but considering the small workload it is not bad i guess. I am actually quite happy, that the high parallelism one performed really well. Keep in mind that the workload is still really small and after all tasks are completed, everything is synced before starting the next iteration. The tradeoff is, that all participating cores are completely blocked as long as parallel computations can happen.

    For anyone interested, here is my testing code:

    #include <iostream>
    #include <thread>
    #include "windows.h"
    
    //Object that can compute something to simulate workload
    class ComputeObject
    {
    public:
      float A;
      float B;
      float C;
    
      void Compute()
      {
        //Do some calculations that approximately match the small workload
        C = float(sin(A)) + float(cos(B));
        C = C * A + atan2(A, B);
        C /= A + B;
      }
    };
    
    //Stores some information for the worker thread that is responsible for this task
    struct Task
    {
      ComputeObject* ComputeObject = nullptr; //the current compute object
      bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
      std::thread WorkerThread; //the thread
    };
    
    //Pointer to an array of Task
    Task* Tasks;
    
    //Number of Cpus (logical cores) and number of worker tasks
    int NumCpus;
    int NumTask;
    
    //Flag, that is used to stop the workers when computation is done
    bool WorkersRunning;
    
    //Main function for each worker
    void TaskWorker(const int workerIndex)
    {
      //Assign the worker to a specific logical core.
      //Skip the first one, because the scheduler is going to block that one.
      SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));
    
      //Get pointer to task struct for current worker
      const auto task = Tasks + workerIndex;
      while (WorkersRunning)
      {
        while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
        if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
        task->ComputeObject->Compute(); //Do computation
        task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
      }
    }
    
    //The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
    void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
    {
      const auto computeObjectsStart = computeObjects;
      const auto computeObjectsEnd = computeObjects + numComputeObjects;
      const auto tasksStart = Tasks;
      const auto tasksEnd = Tasks + NumTask;
    
      auto currentComputeObject = computeObjectsStart;
      auto currentTask = tasksStart;
    
      //as long as there are still ComputeObjects to be processed
      while (currentComputeObject != computeObjectsEnd)
      {
        if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
        {
          currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
          currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
        }
    
        currentTask++; //advance to the next task
        if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
      }
    }
    
    int main()
    {
      //get number of logical cores
      NumCpus = int(std::thread::hardware_concurrency());
      NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
      Tasks = new Task[NumTask];
    
    
      const auto numParallelWork = 32; //number of computations that can be done in parallel
      const int numInvocations = 1e6; //number of invocations for time measurement
    
      //create ComputeObjects array and compute start/end pointers
      const auto computeObjects = new ComputeObject[numParallelWork];
      const auto computeObjectsStart = computeObjects;
      const auto computeObjectsEnd = computeObjects + numParallelWork;
    
      //fill ComputeObjects with random data
      for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
      {
        computeObject->A = float(rand()) / RAND_MAX;
        computeObject->B = float(rand()) / RAND_MAX;
      }
    
      //set workers running
      WorkersRunning = true;
    
      //spawn workers
      for (auto i = 0; i < NumTask; i++)
        Tasks[i].WorkerThread = std::thread(TaskWorker, i);
    
      //put this thread to first logical core
      SetThreadAffinityMask(GetCurrentThread(), 1 << 0);
    
      //wait 20s to allow windows to actually move the threads to the specified cores
      //monitor task manager to ensure windows actually did that
      Sleep(20000);
    
      std::chrono::steady_clock::time_point start, end;
      std::chrono::duration<double> elapsed;
    
    
    
      start = std::chrono::steady_clock::now(); //start time measurement
    
      //invoke task scheduler a few times
      for (auto i = 0; i < numInvocations; i++)
        TaskScheduler(computeObjects, numParallelWork);
    
      end = std::chrono::steady_clock::now(); //end time measurement
      elapsed = end - start;
      std::cout << "parallel: " << elapsed.count() << "s" << std::endl;
    
    
      //stop workers and wait for all threads
      WorkersRunning = false;
      for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();
    
    
      //wait 10 seconds just for good measures
      Sleep(10000);
    
    
      start = std::chrono::steady_clock::now(); //start time measurement
    
      //invoke sequential loop a few times
      for (auto i = 0; i < numInvocations; i++)
        for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
          computeObject->Compute();
    
      end = std::chrono::steady_clock::now(); //end time measurement
      elapsed = end - start;
      std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
    }