Search code examples
multithreadingalgorithmproducer-consumer

modified producer consumer algorithm


I am working on a producer/consumer problem with a modification. Yet, there is a race condition and I am debating the best way to go about it. There might be cleaner ways and I wonder if anyone has done something similar and if possible share a better solution.

it starts as normal producer/consumer using a queue. One producer thread reads items from the disk and enqueues on a shared queue. Then multiple consumer threads tries to dequeues the items for processing. However each item has a tag (like a thread Id) that MUST match the consumer tag. A consumer thread looks at the front of the queue and checks the tag of the item. If it does not match the tag of the consumer thread, the consumer must go to sleep and wait until the front of the queue has an item that matches its tag. A bit confusing, but the pseudo-code below hopefully clarifies the algorithm:

struct item
{
   // This is unique tag that only a specific consumer can consumer
    int    consumerTag; 
    // data for the consumer to consume
    void   *pData;
}

///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
    lock(queue)
    if (queueNotFull)
    {
        enqueue(item);
    }
    else
    {
       // check front of the queue, notify worker.
       Sleep(); // Releases Queue Mutex upon entering
       // requires the mutex after it has been awaken
    }
    unlock(queue);
    wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
    lock (queue);
    if (queueNotEmpty)
    {
        item = queueFront()
        if (myTag == item->id)
        {
           // this item is for me, let's dequeue and process
           item = dequeue();
           process();
        }
        else
        {
           // This is not for me let's go to sleep
           Sleep(); // Releases Queue Mutex
          // re-acquire mutex
        }
    }
    else
    {
        Sleep();    // Releases Queue Mutex
       // re-acquire mutex
    }

    unlock (queue);
    wakeUpProducer();
}

However there are problems with the algorithm above. Let's consider the following events and assume this:

item.tag=1 means this item must be consumed only by the consumer with the same tag. I will represent this as consumer.tag = 1

  1. Producer reads item.tag=1 and enqueues
  2. Producer wakes up all consumer threads (consumer.tag=1, consumer.tag=2, etc... are all awake now and checking the front of the queue)
  3. Producer reads item.tag=2 and enqueues
  4. Producer wakes up all consumer threads
  5. queue now has [item.tag=1, item.tag=2]
  6. consumer.tag=2 wakes up and peek at the front of the queue, butitem.tag=1 which does not match consumer.tag=1; therefore, it goes to sleep. consumer.tag=2 is sleeping now.
  7. consumer.tag=1 wakes up and peek at the front of the queue, and item.tag=1which matches consumer.tag=1. Dequeues and notify the producer it can consume more.
  8. The producer finishes reading the data and exits. Now the queue has item.tag=2 and consumer.tag=2 is sleeping and never consumes that data. Notice that can there be many consumers. So at the end many consumers can end up sleeping and the queue

I thought in just add at the end of the producer thread a loop that keeps waking up all the sleeping threads until the queue is empty.

// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
     WakeUpAllConsumer();
     Sleep();
}

But I believe there must be a more elegant way to handle this problem. Any ideas let me know

thx!


Solution

  • I encountered something similar a while back (in a setup where all threads could deal with all items), and the solution I used there, even though not all that elegant was to wake everyone up one last time when the producer finished reading the data.
    Here, this will not really work, since if you have a third item in your queue, then that item might get left behind. What I would suggest is one of two ways:

    1. Wake up all the thread EVERY TIME a consumer dequeues an item. This is the only way I can think of to make sure that nothing gets left behind. (this mode could be done only if isProducerFinishedReading == true to save one resources/time.

    2. Re-design the system to have 10 queues, and then when an item is added to queue n, consumer thread n is awoken. When it is finished with the element, it checks the queue again for new item to deal with. In any case the producer should check the length of all queues when the reading is done and wake the corresponding threads.

    Hope that helped.

    EDIT:

    1. Every time a thread finishes working, it should check the queue again and if the item there is "his", then it does the work. If a thread can wake up other threads then it should wake up the corresponding one.