Search code examples
ace

Unable to acquire a mutex held by a ACE_Condition wait


I have the following code which is used to Push and Pend from a queue. The caller code has multiple MsgQ objects. It is possible that the Push and the Pend functions are waiting on the _notFull->wait() and the _notEmpty->wait() conditional waits. These waits are protected by the _mut mutex. The notFull and the notEmpty waits operate on the empty and full variables.

When the destructor is called, the _deleteQueue is called internally, from which I would like to signal to the waiting threads to cleanup and stop waiting for a signal to come. Once that is done, I delete my objects. However, in the _deleteQueue function, when I attempt to do _mut->acquire(), I am unable to acquire the mutex. Even if I ignore the acquire, I am unable to broadcast to these waiting threads. Where am I going wrong?

Thanks, Vikram.

    MsgQ::~MsgQ()
{
    _deleteQueue();

    delete _mut;_mut=NULL;
    delete  _notFull;_notFull=NULL;
    delete _notEmpty;_notEmpty=NULL;
    delete _PostMutex; _PostMutex = NULL;
    delete _PendMutex; _PendMutex = NULL;
    delete _PostInProgressMutex; _PostInProgressMutex = NULL;
    delete _PendInProgressMutex; _PendInProgressMutex = NULL;
    delete _DisconnectMutex; _DisconnectMutex = NULL;
    free( _ptrQueue ); _ptrQueue = NULL;
}

int MsgQ::Post(Message* msg)
{
    _PostMutex->acquire();
    _postInProgress++;
    _PostMutex->release();

    if (msg)
    msg->print();

    _mut->acquire();
    while (full) 
    {
    _notFull->wait();
    }

    if (!_disconnectInProgress)
    _queuePush(msg);
    _mut->release();

    _PostMutex->acquire();
    _postInProgress--;
    if (_postInProgress==0) 
    {
    _PostInProgressMutex->signal();
    }
    _PostMutex->release();

    return _notEmpty->signal();
}

int MsgQ::Pend(Message*& msg)
{

    _PendMutex->acquire();
    _pendInProgress++;
    _PendMutex->release();

    _mut->acquire();
    while (empty) 
    _notEmpty->wait();

    if (!_disconnectInProgress)
    {
    _queuePop(msg);
    }
    _mut->release();

    _PendMutex->acquire();
    _pendInProgress--;
    if (_pendInProgress == 0)
    {
    _PendInProgressMutex->signal();
    }
    _PendMutex->release();

    return _notFull->signal();
}

void MsgQ::_deleteQueue ()
{
    _PostMutex->acquire();
    if (_postInProgress != 0)
    {
    _PostMutex->release();
    TRACE("Acquiring Mutex.");
    _mut->acquire();
    full = 0;
    _notFull->broadcast();
    _mut->release();
    _PostInProgressMutex->wait();
    }
    else
    {
    _PostMutex->release();
    }

    _PendMutex->acquire();
    if (_pendInProgress != 0)
    {
    _PendMutex->release();
    TRACE("Acquiring Mutex.");
    _mut->acquire();
    empty = 0;
    _notEmpty->broadcast();
    _mut->release();
    _PendInProgressMutex->wait();
    }
    else
    {
    _PendMutex->release();
    }
}

void MsgQ::_initQueue()
{
    _ptrQueue = (Message **)(malloc (size * sizeof (Message*)));

    if (_ptrQueue == NULL)
    {
    cout << "queue could not be created!" << endl;
    }
    else
    {
    for (int i = 0; i < size; i++)
        *(_ptrQueue + i) = NULL;
    empty = 1;
    full = 0;
    head = 0;
    tail = 0;

        try{
        _mut = new ACE_Mutex() ;
        _notFull = new ACE_Condition<ACE_Mutex>(*_mut);
        _notEmpty = new ACE_Condition<ACE_Mutex>(*_mut); 

    _PostMutex = new ACE_Mutex();
    _PendMutex = new ACE_Mutex();
    _PostInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PostMutex);
    _PendInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PendMutex);
    _DisconnectMutex = new ACE_Mutex();

    _postInProgress = 0;
    _pendInProgress = 0;
    _disconnectInProgress = false;

        }catch(...){
            cout << "you should not be here" <<  endl;
        }
    }
}

Solution

  • There seem to be many problems with the code so I would suggest reworking it:

    1. You have a potential for deadlock because you are acquiring _mut before you go into a wait condition in both Post and Pend functions.
    2. Instead of using acquire and release on a ACE_Mutex I would suggest looking at using ACE_Guard class which can acquire mutex when created and release it when destroyed.
    3. Why not use ACE_Message_Queue instead of creating your own?