Search code examples
c#.netmultithreadingasynchronousblockingcollection

awaiting async method in consumer using BlockingCollection as queue


I am working on a server side console application which receives data from multiple WCF services, does some work on it and then forwards the results to an IIS server over a single connection using SignalR.

I tried to implement this using the producer consumer pattern where the WCF services are the producers and the class sending the data using SignalR is the consumer. For the queue I used the BlockingCollection.

However, when using await/async to send the data in the consumer's while loop gets stuck until all other threads have finished adding data to the queue.

For testing purposes I have replaced the code actually sending the data with a Task.Delay(1000).Wait(); or await Task.Delay(1000); which both get stuck as well. A simple Thread.Sleep(1000); seems to work just fine, leading me to think the asynchronous code is the problem.

So my question is: Is there something preventing the asynchronous code being completed in the while loop? What am I missing?

I'm starting the consumer thread like this:

new Thread(Worker).Start();

And the consumer code:

private void Worker()
{
    while (!_queue.IsCompleted)
    {
        IMobileMessage msg = null;
        try
        {
            msg = _queue.Take();
        }
        catch (InvalidOperationException)
        {
        }

        if (msg != null)
        {
            try
            {
                Trace.TraceInformation("Sending: {0}", msg.Name);
                Thread.Sleep(1000); // <-- works
                //Task.Delay(1000).Wait(); // <-- doesn't work
                msg.SentTime = DateTime.UtcNow;
                Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
            }
            catch (Exception e)
            {
                TraceException(e);
            }
        }
    }
}

Solution

  • As spender correctly pointed out, BlockingCollection (as the name implies) is intended only for use with blocking code, and does not work so well with asynchronous code.

    There are async-compatible producer/consumer queues, such as BufferBlock<T>. In this case, I would think ActionBlock<T> would be even better:

    private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
    {
      try
      {
        Trace.TraceInformation("Sending: {0}", msg.Name);
        await Task.Delay(1000);
        msg.SentTime = DateTime.UtcNow;
        Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
      }
      catch (Exception e)
      {
        TraceException(e);
      }
    });
    

    This replaces your entire consuming thread and main loop.