I am working on a producer/consumer problem with a modification. Yet, there is a race condition and I am debating the best way to go about it. There might be cleaner ways and I wonder if anyone has done something similar and if possible share a better solution.
it starts as normal producer/consumer using a queue. One producer thread reads items from the disk and enqueues on a shared queue. Then multiple consumer threads tries to dequeues the items for processing. However each item has a tag (like a thread Id) that MUST match the consumer tag. A consumer thread looks at the front of the queue and checks the tag of the item. If it does not match the tag of the consumer thread, the consumer must go to sleep and wait until the front of the queue has an item that matches its tag. A bit confusing, but the pseudo-code below hopefully clarifies the algorithm:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
However there are problems with the algorithm above. Let's consider the following events and assume this:
item.tag=1 means this item must be consumed only by the consumer with the same tag. I will represent this as consumer.tag = 1
item.tag=1
and enqueuesconsumer.tag=1
, consumer.tag=2
, etc... are all awake now and checking the front of the queue)item.tag=2
and enqueues[item.tag=1, item.tag=2]
consumer.tag=2 wakes up and peek at the front of the queue, but
item.tag=1 which does not match consumer.tag=1
; therefore, it goes to sleep. consumer.tag=2
is sleeping now.consumer.tag=1
wakes up and peek at the front of the queue, and item.tag=1
which matches consumer.tag=1
. Dequeues and notify the producer it can consume more.item.tag=2
and consumer.tag=2
is sleeping and never consumes that data. Notice that can there be many consumers. So at the end many consumers can end up sleeping and the queue I thought in just add at the end of the producer thread a loop that keeps waking up all the sleeping threads until the queue is empty.
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
But I believe there must be a more elegant way to handle this problem. Any ideas let me know
thx!
I encountered something similar a while back (in a setup where all threads could deal with all items), and the solution I used there, even though not all that elegant was to wake everyone up one last time when the producer finished reading the data.
Here, this will not really work, since if you have a third item in your queue, then that item might get left behind. What I would suggest is one of two ways:
Wake up all the thread EVERY TIME a consumer dequeues an item. This is the only way I can think of to make sure that nothing gets left behind. (this mode could be done only if isProducerFinishedReading == true
to save one resources/time.
Re-design the system to have 10 queues, and then when an item is added to queue n
, consumer thread n
is awoken. When it is finished with the element, it checks the queue again for new item to deal with. In any case the producer should check the length of all queues when the reading is done and wake the corresponding threads.
Hope that helped.
EDIT: