I have the following code which is used to Push and Pend from a queue. The caller code has multiple MsgQ objects. It is possible that the Push and the Pend functions are waiting on the _notFull->wait()
and the _notEmpty->wait()
conditional waits. These waits are protected by the _mut mutex. The notFull and the notEmpty waits operate on the empty
and full
variables.
When the destructor is called, the _deleteQueue
is called internally, from which I would like to signal to the waiting threads to cleanup and stop waiting for a signal to come. Once that is done, I delete my objects. However, in the _deleteQueue
function, when I attempt to do _mut->acquire()
, I am unable to acquire the mutex. Even if I ignore the acquire, I am unable to broadcast
to these waiting threads. Where am I going wrong?
Thanks, Vikram.
MsgQ::~MsgQ()
{
_deleteQueue();
delete _mut;_mut=NULL;
delete _notFull;_notFull=NULL;
delete _notEmpty;_notEmpty=NULL;
delete _PostMutex; _PostMutex = NULL;
delete _PendMutex; _PendMutex = NULL;
delete _PostInProgressMutex; _PostInProgressMutex = NULL;
delete _PendInProgressMutex; _PendInProgressMutex = NULL;
delete _DisconnectMutex; _DisconnectMutex = NULL;
free( _ptrQueue ); _ptrQueue = NULL;
}
int MsgQ::Post(Message* msg)
{
_PostMutex->acquire();
_postInProgress++;
_PostMutex->release();
if (msg)
msg->print();
_mut->acquire();
while (full)
{
_notFull->wait();
}
if (!_disconnectInProgress)
_queuePush(msg);
_mut->release();
_PostMutex->acquire();
_postInProgress--;
if (_postInProgress==0)
{
_PostInProgressMutex->signal();
}
_PostMutex->release();
return _notEmpty->signal();
}
int MsgQ::Pend(Message*& msg)
{
_PendMutex->acquire();
_pendInProgress++;
_PendMutex->release();
_mut->acquire();
while (empty)
_notEmpty->wait();
if (!_disconnectInProgress)
{
_queuePop(msg);
}
_mut->release();
_PendMutex->acquire();
_pendInProgress--;
if (_pendInProgress == 0)
{
_PendInProgressMutex->signal();
}
_PendMutex->release();
return _notFull->signal();
}
void MsgQ::_deleteQueue ()
{
_PostMutex->acquire();
if (_postInProgress != 0)
{
_PostMutex->release();
TRACE("Acquiring Mutex.");
_mut->acquire();
full = 0;
_notFull->broadcast();
_mut->release();
_PostInProgressMutex->wait();
}
else
{
_PostMutex->release();
}
_PendMutex->acquire();
if (_pendInProgress != 0)
{
_PendMutex->release();
TRACE("Acquiring Mutex.");
_mut->acquire();
empty = 0;
_notEmpty->broadcast();
_mut->release();
_PendInProgressMutex->wait();
}
else
{
_PendMutex->release();
}
}
void MsgQ::_initQueue()
{
_ptrQueue = (Message **)(malloc (size * sizeof (Message*)));
if (_ptrQueue == NULL)
{
cout << "queue could not be created!" << endl;
}
else
{
for (int i = 0; i < size; i++)
*(_ptrQueue + i) = NULL;
empty = 1;
full = 0;
head = 0;
tail = 0;
try{
_mut = new ACE_Mutex() ;
_notFull = new ACE_Condition<ACE_Mutex>(*_mut);
_notEmpty = new ACE_Condition<ACE_Mutex>(*_mut);
_PostMutex = new ACE_Mutex();
_PendMutex = new ACE_Mutex();
_PostInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PostMutex);
_PendInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PendMutex);
_DisconnectMutex = new ACE_Mutex();
_postInProgress = 0;
_pendInProgress = 0;
_disconnectInProgress = false;
}catch(...){
cout << "you should not be here" << endl;
}
}
}
There seem to be many problems with the code so I would suggest reworking it:
_mut
before you go into a wait condition in both Post
and Pend
functions.acquire
and release
on a ACE_Mutex
I would suggest looking at using ACE_Guard
class which can acquire
mutex when created and release it when destroyed.ACE_Message_Queue
instead of creating your own?