Lets suppose i want to compute the methods A
and B
in the following example in parallel:
while (true)
{
int state = NextState();
int a = A(state);
int b = B(state);
ImportantMethod(a, b);
}
NextState
can not be computed ahead of time in this example.
As you can see the arguments of the methods A
and B
are dependent on the state. The resulting values a
and b
are used to call the ImportantMethod
which has to be called every iteration.
This requires the multithreading code to call A
and B
in parallel and wait for their result inside a single iteration. It is not possible to combine consecutive iterations to create a greater workload.
ImportantMethod
is used in a real time application and is currently called too infrequently.
The methods A
and B
have quite a small workload. Roughly about 10 multiplications and 2 trigonometric functions (sin, cos), just to bring that into perspective.
A
and B
are the bottleneck though, ImportantMethod
just requires the loop to be extremely fast.
I have considered using two threads for A
and B
and wake them up every iteration using a condition variable. But considering the small workload, i fear that the overhead of waking up the threads and waiting for their result is larger then the computation itself. I could also try using busy waiting with a boolean flag without any synchronization. That would hog the cpu really well, but that would be acceptable.
I have simplified the problem a lot and i don't think i can create a test application that would give me accurate performance metrics that would be comparable to what the real application would have. Implementing that in the real application is going to be really complicated though, so i would like to gain more knowledge before attempting that.
Therefore i would like to ask if someone has experience with these types of problems. Particularly with very frequent thread wakeups and synchronization.
Can busy waiting be favorable over notifying with a condition variable? Are there any other methods to synchronize multiple threads that i have not considered yet which might be better suitable?
Ok i did some reasearch now. Maybe my results are going to be interesting for someone. Keep in mind that some parts of the code below are windows specific (thread affinity and sleep to be precise).
So it looks like you can make really small workloads faster in parallel. But it requires to basically stall some (or all) cores using busy waiting. I could not archieve any speedup otherwise. It is also really important to set thread affinity so that the threads are executed on specific cores. In my tests, not doing that resulted again in slower speed then the sequential counterpart.
What i came up with is the following:
This results in all participating cores (including the main thread core) to be maxed out at 100% for the time of the computations. The worker threads are not terminating between consecutive parallel calls to save thread start overhead.
It should be noted that at least on windows, you have to wait some time in order for windows to properly move the threads to the requested cores. I did that using sleep. The following image shows my cpu usage during the test (60s).
I have marked a few positions on the graph of the first core:
I have tested the timings with two different degrees of parallelism:
32 tasks (essentially up to 7 in parallel, the image above is captured during this test):
parallel: 2.7s
sequential: 7.1s
2 tasks (2 in parallel, other cores busy waiting (wasted)):
parallel: 0.365s
sequential: 0.464s
As you can see, there is some speedup even on the 2 tasks one. It is not half the time but considering the small workload it is not bad i guess. I am actually quite happy, that the high parallelism one performed really well. Keep in mind that the workload is still really small and after all tasks are completed, everything is synced before starting the next iteration. The tradeoff is, that all participating cores are completely blocked as long as parallel computations can happen.
For anyone interested, here is my testing code:
#include <iostream>
#include <thread>
#include "windows.h"
//Object that can compute something to simulate workload
class ComputeObject
{
public:
float A;
float B;
float C;
void Compute()
{
//Do some calculations that approximately match the small workload
C = float(sin(A)) + float(cos(B));
C = C * A + atan2(A, B);
C /= A + B;
}
};
//Stores some information for the worker thread that is responsible for this task
struct Task
{
ComputeObject* ComputeObject = nullptr; //the current compute object
bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
std::thread WorkerThread; //the thread
};
//Pointer to an array of Task
Task* Tasks;
//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;
//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;
//Main function for each worker
void TaskWorker(const int workerIndex)
{
//Assign the worker to a specific logical core.
//Skip the first one, because the scheduler is going to block that one.
SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));
//Get pointer to task struct for current worker
const auto task = Tasks + workerIndex;
while (WorkersRunning)
{
while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
task->ComputeObject->Compute(); //Do computation
task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
}
}
//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numComputeObjects;
const auto tasksStart = Tasks;
const auto tasksEnd = Tasks + NumTask;
auto currentComputeObject = computeObjectsStart;
auto currentTask = tasksStart;
//as long as there are still ComputeObjects to be processed
while (currentComputeObject != computeObjectsEnd)
{
if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
{
currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
}
currentTask++; //advance to the next task
if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
}
}
int main()
{
//get number of logical cores
NumCpus = int(std::thread::hardware_concurrency());
NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
Tasks = new Task[NumTask];
const auto numParallelWork = 32; //number of computations that can be done in parallel
const int numInvocations = 1e6; //number of invocations for time measurement
//create ComputeObjects array and compute start/end pointers
const auto computeObjects = new ComputeObject[numParallelWork];
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numParallelWork;
//fill ComputeObjects with random data
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
{
computeObject->A = float(rand()) / RAND_MAX;
computeObject->B = float(rand()) / RAND_MAX;
}
//set workers running
WorkersRunning = true;
//spawn workers
for (auto i = 0; i < NumTask; i++)
Tasks[i].WorkerThread = std::thread(TaskWorker, i);
//put this thread to first logical core
SetThreadAffinityMask(GetCurrentThread(), 1 << 0);
//wait 20s to allow windows to actually move the threads to the specified cores
//monitor task manager to ensure windows actually did that
Sleep(20000);
std::chrono::steady_clock::time_point start, end;
std::chrono::duration<double> elapsed;
start = std::chrono::steady_clock::now(); //start time measurement
//invoke task scheduler a few times
for (auto i = 0; i < numInvocations; i++)
TaskScheduler(computeObjects, numParallelWork);
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "parallel: " << elapsed.count() << "s" << std::endl;
//stop workers and wait for all threads
WorkersRunning = false;
for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();
//wait 10 seconds just for good measures
Sleep(10000);
start = std::chrono::steady_clock::now(); //start time measurement
//invoke sequential loop a few times
for (auto i = 0; i < numInvocations; i++)
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
computeObject->Compute();
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}