I am having an issue where it looks like my ConcurrentQueue in a singleton is not processing items in the correct order. I know it's FIFO, so I am thinking that maybe the queue in memory is not the same somehow or something is going wrong with my Dequeue? The way I am testing this is by firing 3 postman requests to my API endpoint quickly. If anybody can help me understand why these are not running after each other I will be greatly appreciative!
As of now, I am leaning towards the Queue.TryPeek is not working correctly as the 2nd and 3rd requests seem to be queuing before the first one is dequeued.
So when I run the below code, I am seeing the following output in the console.
Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3
This is my API controller method that is getting a message and queuing that message, once the message is queued it will wait until it see's that request's message in the front and then send it and then dequeue it.
Controller
[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
Console.WriteLine($"Queued message: {request.Message}");
_messageQueue.QueueAndWaitForTurn(request);
Console.WriteLine($"Sending message: {request.Message}");
var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
Console.WriteLine($"Dequeuing message: {request.Message}");
_messageQueue.DequeueMessage(request);
Console.WriteLine($"Returning response: {request.Message}");
return Ok(sendMessageResponse);
}
As for the Queue, I am binding it to IoC like so:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMessageQueue, MessageQueue>();
services.AddScoped<IMessageService, MessageService>();
services.AddMvc();
}
And this is my Queue class singleton, I am using a Singleton here because I would like there by only 1 instance of this queue throughout the application's lifetime.
public class MessageQueue : IMessageQueue
{
private Lazy<ConcurrentQueue<Message>> _queue =
new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());
public ConcurrentQueue<Message> Queue
{
get
{
return _queue.Value;
}
}
public void QueueAndWaitForTurn(Message message)
{
Queue.Enqueue(message);
WaitForTurn();
}
public bool DequeueMessage(Message message)
{
var messageIsDequeued = Queue.TryDequeue(out message);
return messageIsDequeued;
}
public void WaitForTurn()
{
Message myMessage = null;
var myMessageIsNext = Queue.TryPeek(out myMessage);
while (!Queue.TryPeek(out myMessage))
{
Thread.Sleep(1000);
WaitForTurn();
}
}
}
I'd create a kind of FifoSemaphore:
public class FifoSemaphore : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
private readonly object _lockObject = new object();
public async Task WaitAsync()
{
// Enqueue a task
Task resultTask;
lock (_lockObject)
{
var tcs = new TaskCompletionSource<object>();
resultTask = tcs.Task;
_taskQueue.Enqueue(tcs);
}
// Wait for the lock
await _semaphore.WaitAsync();
// Dequeue the next item and set it to resolved (release back to API call)
TaskCompletionSource<object> queuedItem;
lock (_lockObject)
{
queuedItem = _taskQueue.Dequeue();
}
queuedItem.SetResult(null);
// Await our own task
await resultTask;
}
public void Release()
{
// Release the semaphore so another waiting thread can enter
_semaphore.Release();
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
And then use it like this:
[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
try
{
await _fifoSemaphore.WaitAsync();
// process message code here
}
finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
{
_fifoSemaphore.Release();
}
}
The idea is that each waiting item will first be queued.
Next, we wait for the semaphore to let us in (our semaphore allows one item at a time).
Then we dequeue the next waiting item, and release it back to the API method.
Finally, we wait for our own position in the queue to complete and then return to the API method.
In the API method, we asynchronously wait for our turn, do our task, and then return. A try/finally is included to ensure that the semaphore will be released for subsequent messages, even in case of failure.