Search code examples
c#concurrencyasync-await.net-4.5tpl-dataflow

Use events to forward exceptions for producer/consumer using Dataflow


I’m trying to implement a producer/consumer queue using Dataflow for HTTP requests towards a web service. I found an excellent post from Stephen Cleary, which is covering exactly this scenario. However, in contrast to Stephen’s post, I cannot mark the producer queue as complete since clients shall be able to enqueue requests throughout the entire lifetime of the application. The idea behind this approach that the client can constantly produce requests and the consumer is able to handle requests differently if more than 1 request is pending (which is required).

This requirement leads also to the fact that the consumption of the requests cannot be started after the production was finished, but have to be started the first request was enqueued. This also requires me to start the consumption in a non-blocking way (otherwise it would lead to a deadlock). I’ve done this via an async-call which is not awaited, which unfortunately hampers the exception handling. Exceptions occurring during the consumption (implementing the HTTP requests) cannot bubble up since the call of the consume-function is not awaited. I’ve introduced and event to deal with this kind of problem, but this leads me to the following questions:

  1. Is it a good idea to use an event to forward exceptions from the consumer to the client of the producer?
  2. Is this a good idea to implement the producer/consumer pattern in that fashion for my use case?
  3. Are there potentially other approaches, which are more beneficial under the given circumstances?

To make it a more explicit, I’ve prepared a code example illustrating the problem I described above:

public class HttpConnector 
{
    private BufferBlock<RequestPayload> queue;

    public delegate void ConnectorExceptionHandler(object sender, Exception e);
    public event ConnectorExceptionHandler ConnectorExceptionOccured;

    public Task<bool> ProduceRequest(RequestPayload payload)
    {
        if(this.queue == null)
        {
            this.queue = new BufferBlock<RequestPayload>();
            this.ConsumeRequestsAsync(queue);   //this call cannot be awaited since it would lead to a deadlock
                                                //however, by not awaiting this call all exceptions raised in 
                                                //ConsumeRequestsAsync will be lost
        }
        return await queue.SendAsync(payload)
    }

    public Task ConsumeRequestsAsync(BufferBlock<RequestPayload> queue)
    {
        while(await queue.OutputAvailableAsync())
        {
            try
            {
                var payload = await queue.ReceiveAsync();
                //do the HTTP request...
            }
            catch (Exception e)
            {
                ConnectorExceptionOccured(this, e); //fire event to forward the exception to the client
            }
        }
    }
}

public class Client 
{
    private HttpConnector connector = new HttpConnector();

    public Task<bool> UpdateDataAsync()
    {
        connector += (object sender, Exception e )  //register an event handler to receive exceptions occur 
                                                    //during the consumption of the requests 
        {
            //handle exception or re-throw 
        }; 
        connector.ProduceRequest(new RequestPayload());  //produce a request
    }
}

Solution

  • Forwarding exceptions via an event has some severe drawbacks:

    • Natural exception handling is not possible. If developers are aware of this mechanism, they won't catch any exception.
    • You cannot use AppDomain#UnhandledException for unhandled exceptions during the application runtime. In fact, if you don't have a subscription to the 'Exception'-event, the exception is completely lost.
    • If you have only one event to subscribe to, your exception object needs a lot of context information in order to figure out which operation caused the exception.

    For our problem it turned out that it is better to use TaskCompletionSource, which is a standard technique to synchronize different threads. An instance of TaskCompletionSource class is provided by each RequestPayload object. After the consumption the TaskCompletionSource.Task is completed (either with the result or with an exception). The producer doesn't return the Task for queue.SendAsync(payload) but payload.CompletionSource.Task:

    public class RequestPayload
    {
        public IModelBase Payload { get; set; }
        public TaskCompletionSource<IResultBase> CompletionSource { get; private set; }
    }
    
    public class HttpConnector 
    {
        private BufferBlock<RequestPayload> queue;
    
        public Task ProduceRequest(RequestPayload payload)
        {
            if(this.queue == null)
            {
                this.queue = new BufferBlock<RequestPayload>();
                this.ConsumeRequestsAsync(queue);  
            }
            await queue.SendAsync(payload);
            return await payload.CompletionSource.Task;
    
    }
    
        public Task ConsumeRequestsAsync(BufferBlock<RequestPayload> queue)
        {
            while(await queue.OutputAvailableAsync())
            {
                try
                {
                     var payload = await queue.ReceiveAsync();
                     //do the HTTP request...
                     payload.CompletionSource.TrySetResult(null);
                }
                catch (Exception e)
                {
                     payload.CompletionSource.TrySetException(e)
                }
           }
       }
    }
    
    public class Client 
    {
        private HttpConnector connector = new HttpConnector();
    
        public Task UpdateDataAsync()
        {
            try
            {
                 await connector.ProduceRequest(new RequestPayload());
            } 
            catch(Exception e) { /*handle exception*/ }
        }
    }