I have implemented an interprocess message queue in shared memory for one producer and one consumer on Windows.
I am using one named semaphore to count empty slots, one named semaphore to count full slots and one named mutex to protect the data structure in shared memory.
Consider, for example the consumer side. The producer side is similar. First it waits on the full semaphore then (1) it takes a message from the queue under the mutex and then it signals the empty semaphore (2)
The problem:
If the consumer process crashes between (1) and (2) then effectively the number of slots in the queue that can be used by the process is reduced by one. Assume that while the consumer is down, the producer can handle the queue getting filled up. (it can either specify a timeout when waiting on the empty semaphore or even specify 0 for no wait).
When the consumer restarts it can continue to read data from the queue. Data will not have been overrun but even after it empties all full slots, the producer will have one less empty slot to use.
After multiple such restarts the queue will have no slots that can be used and no messages can be sent.
Question:
How can this situation be avoided or recovered from?
Here's an outline of one simple approach, using events rather than semaphores:
DWORD increment_offset(DWORD offset)
{
offset++;
if (offset == QUEUE_LENGTH*2) offset = 0;
return offset;
}
void consumer(void)
{
for (;;)
{
DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0);
if ((current_write_offset != *read_offset + QUEUE_LENGTH) &&
(current_write_offset + QUEUE_LENGTH != *read_offset))
{
// Queue is not full, make sure producer is awake
SetEvent(signal_producer_event);
}
if (*read_offset == current_write_offset)
{
// Queue is empty, wait for producer to add a message
WaitForSingleObject(signal_consumer_event, INFINITE);
continue;
}
MemoryBarrier();
_ReadWriteBarrier;
consume((*read_offset) % QUEUE_LENGTH);
InterlockedExchange(read_offset, increment_offset(*read_offset));
}
}
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
produce((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
}
}
Notes:
The code as posted compiles (given the appropriate declarations) but I have not otherwise tested it.
read_offset
is a pointer to a DWORD
in shared memory, indicating which slot should be read from next. Similarly, write_offset
points to a DWORD
in shared memory indicating which slot should be written to next.
An offset of QUEUE_LENGTH + x
refers to the same slot as an offset of x
so as to disambiguate between a full queue and an empty queue. That's why the increment_offset()
function checks for QUEUE_LENGTH*2
rather than just QUEUE_LENGTH
and why we take the modulo when calling the consume()
and produce()
functions. (One alternative to this approach would be to modify the producer to never use the last available slot, but that wastes a slot.)
signal_consumer_event
and signal_producer_event
must be automatic-reset events. Note that setting an event that is already set is a no-op.
The consumer only waits on its event if the queue is actually empty, and the producer only waits on its event if the queue is actually full.
When either process is woken, it must recheck the state of the queue, because there is a race condition that can lead to a spurious wakeup.
Because I use interlocked operations, and because only one process at a time is using any particular slot, there is no need for a mutex. I've included memory barriers to ensure that the changes the producer writes to a slot will be seen by the consumer. If you're not comfortable with lock-free code, you'll find that it is trivial to convert the algorithm shown to use a mutex instead.
Note that InterlockedCompareExchange(pointer, 0, 0);
looks a bit complicated but is just a thread-safe equivalent to *pointer
, i.e., it reads the value at the pointer. Similarly, InterlockedExchange(pointer, value);
is the same as *pointer = value;
but thread-safe. Depending on the compiler and target architecture, interlocked operations may not be strictly necessary, but the performance impact is negligible so I recommend programming defensively.
Consider the case when the consumer crashes during (or before) the call to the consume()
function. When the consumer is restarted, it will pick up the same message again and process it as normal. As far as the producer is concerned, nothing unusual has happened, except that the message took longer than usual to be processed. An analogous situation occurs if the producer crashes while creating a message; when restarted, the first message generated will overwrite the incomplete one, and the consumer won't be affected.
Obviously, if the crash occurs after the call to InterlockedExchange
but before the call to SetEvent
in either the producer or consumer, and if the queue was previously empty or full respectively, then the other process will not be woken up at that point. However, it will be woken up as soon as the crashed process is restarted. You cannot lose slots in the queue, and the processes cannot deadlock.
I think the simple multiple-producer single-consumer case would look something like this:
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
produce_in_local_cache();
claim_mutex();
// read offset may have changed, re-read it
current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
release_mutex();
}
}
If the active producer crashes, the mutex will be detected as abandoned; you can treat this case as if the mutex were properly released. If the crashed process got as far as incrementing the write offset, the entry it added will be processed as usual; if not, it will be overwritten by whichever producer next claims the mutex. In neither case is any special action needed.