I implemented a concurrent queue with two methods: add (enqueue) & remove (dequeue).
To test my implementation using 2 threads, I generated 10 (NUMBER_OF_OPERATIONS) random numbers between 0 and 1 in a method called getRandom(). This allows me to create different distribution of add and remove operations.
The doWork method splits up the work done by the number of threads.
PROBLEM: The threadID that I am passing in from the main function does not match the threadID that the doWork method receives. Here are some sample runs:
#define NUMBER_OF_THREADS 2
#define NUMBER_OF_OPERATIONS 10
int main () {
BoundedQueue<int> bQ;
std::vector<double> temp = getRandom();
double* randomNumbers = &temp[0];
std::thread myThreads[NUMBER_OF_THREADS];
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
cout << "Thread " << i << " created.\n";
myThreads[i] = std::thread ( [&] { bQ.doWork(randomNumbers, i); });
}
cout << "Main Thread\n";
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
if(myThreads[i].joinable()) myThreads[i].join();
}
return 0;
}
template <class T> void BoundedQueue<T>::doWork (double randomNumbers[], int threadID) {
cout << "Thread ID is " << threadID << "\n";
srand(time(NULL));
int split = NUMBER_OF_OPERATIONS / NUMBER_OF_THREADS;
for (int i = threadID * split; i < (threadID * split) + split; i++) {
if(randomNumbers[i] <= 0.5) {
int numToAdd = rand() % 10 + 1;
add(numToAdd);
}
else {
int numRemoved = remove();
}
}
}
In this line you're capturing i
by reference:
myThreads[i] = std::thread ( [&] { bQ.doWork(randomNumbers, i); });
This means that when the other thread runs the lambda, it'll get the latest value of i, not the value when it was created. Capture it by value instead:
myThreads[i] = std::thread ( [&, i] { bQ.doWork(randomNumbers, i); });
Whats worse, as you've got unordered read and write to i
, your current code has undefined behavoir. And the fact i may've gone out of scope on the main thread before the other thread reads it. This fix above fixes all these issues.