Search code examples
c++multithreadingmutexsynchronize

How can I synchronize three threads?


My app consist of the main-process and two threads, all running concurrently and making use of three fifo-queues:

The fifo-q's are Qmain, Q1 and Q2. Internally the queues each use a counter that is incremented when an item is put into the queue, and decremented when an item is 'get'ed from the queue.

The processing involve two threads,
QMaster, which get from Q1 and Q2, and put into Qmain,
Monitor, which put into Q2,
and the main process, which get from Qmain and put into Q1.

The QMaster-thread loop consecutively checks the counts of Q1 and Q2 and if any items are in the q's, it get's them and puts them into Qmain.

The Monitor-thread loop obtains data from external sources, package it and put it into Q2.

The main-process of the app also runs a loop checking the count of Qmain, and if any items, get's an item from Qmain at each iteration of the loop and process it further. During this processing it occasionally puts an item into Q1 to be processed later (when it is get'ed from Qmain in turn).

The problem:
I've implemented all as described above, and it works for a randomly (short) time and then hangs. I've managed to identify the source of the crashing to happen in the increment/decrement of the count of a fifo-q (it may happen in any of them).

What I've tried:
Using three mutex's: QMAIN_LOCK, Q1_LOCK and Q2_LOCK, which I lock whenever any get/put operation is done on a relevant fifo-q. Result: the app doesn't get going, just hangs.

The main-process must continue running all the time, must not be blocked on a 'read' (named-pipes fail, socketpair fail).

Any advice?
I think I'm not implementing the mutex's properly, how should it be done?
(Any comments on improving the above design also welcome)

[edit] below are the processes and the fifo-q-template:
Where & how in this should I place the mutex's to avoid the problems described above?

main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    Q2.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        Q2.put(data)
    }
}

QMaster:
{
    while(1)
    {
        if (Q1.count() > 0)
            Qmain.put(Q1.get());

        if (Q2.count() > 0)
            Qmain.put(Q2.get());
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) { item i=new item(); (... adds to tail...); count++; }
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
    clear() {...}
};

Solution

  • An example of how I would adapt the design and lock the queue access the posix way. Remark that I would wrap the mutex to use RAII or use boost-threading and that I would use stl::deque or stl::queue as queue, but staying as close as possible to your code:

    main-process:
    ...
    start thread Monitor
    ...
    while (!quit)
    {
        ...
        if (Qmain.count() > 0)
        {
            X = Qmain.get();
            process(X) 
                delete X;
        }
        ...
        //at some random time:
        QMain.put(Y);
        ...
    }
    
    Monitor:
    {
        while (1)
        {
            //obtain & package data
            QMain.put(data)
        }
    }
    
    fifo_q:
    template < class X* > class fifo_q
    {
        struct item
        {
            X* data;
            item *next;
            item() { data=NULL; next=NULL; }
        }
        item *head, *tail;
        int count;
        pthread_mutex_t m;
    public:
        fifo_q() { head=tail=NULL; count=0; }
        ~fifo_q() { clear(); /*deletes all items*/ }
        void put(X x) 
        { 
          pthread_mutex_lock(&m);
          item i=new item(); 
          (... adds to tail...); 
          count++; 
          pthread_mutex_unlock(&m);
        }
        X* get() 
        { 
          pthread_mutex_lock(&m);
          X *d = h.data; 
          (...deletes head ...); 
          count--; 
          pthread_mutex_unlock(&m);
          return d; 
        }
        clear() {...}
    };
    

    Remark too that the mutex still needs to be initialized as in the example here and that count() should also use the mutex