Search code examples
c#exceptiontaskblockingcollection

How do I handle an Exception from an anonymous Task?


I have a method that pulls data from a server in chunks and returns it for processing. I made some measurements and found that it is significantly faster to download chunks in the background and return them via a BlockingCollection<T>. This allows the client and the server to work at the same time rather than waiting on one another.

public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
    BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();

    Task.Run(
        () =>
        {
            Boolean isMoreData = false;
            do
            {
                // make server request and process response
                // this block can throw

                records.Add(response.record);
                isMoreData = response.IsMoreData;
            }
            while (isMoreData);

            records.CompleteAdding();
        });

    return records.GetConsumingEnumerable();
}

The caller (a C++/CLI library) should know that an exception occurred so it can try again or bail out as appropriate. What's the best way to propagate the exception to the caller while minimally changing the return type?


Solution

  • This is why fire-and-forget tasks are generally a bad idea. They're even worse of an idea in your case since you're not wrapping your adding inside a try/catch with records.CompleteAdding inside the finally block meaning that the call to MoveNext on the enumerator from your GetConsumingEnumerable will eventually block indefinitely - that's bad bad bad.

    If you were operating entirely within the bounds of C#, the solution would be simple: better separation of concerns. You strip out the BlockingCollection bit and run it where it belongs: in the consumer (client), or the intermediate pipelined processing stage (that's ultimately what you're trying to achieve) which will be designed in a way that it remains aware of any of the exceptions thrown by the producer. Your GetData signature then remains the same, but it becomes a simple blocking enumerable with full exception propagation:

    public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
    {
        Boolean isMoreData = false;
        do
        {
            // make server request and process response
            // this block can throw
    
            yield return response.record;
            isMoreData = response.IsMoreData;
        }
        while (isMoreData);
    }
    

    Then the pipeline looks like this:

    var records = new BlockingCollection<DataRecord>();
    
    var producer = Task.Run(() =>
    {
        try
        {
            foreach (var record in GetData("http://foo.com/Service", 22))
            {
                // Hand over the record to the
                // consumer and continue enumerating.
                records.Add(record);
            }
        }
        finally
        {
            // This needs to be called even in
            // exceptional scenarios so that the
            // consumer task does not block
            // indefinitely on the call to MoveNext.
            records.CompleteAdding();
        }
    });
    
    var consumer = Task.Run(() =>
    {
        foreach (var record in records.GetConsumingEnumerable())
        {
            // Do something with the record yielded by GetData.
            // This runs in parallel with the producer,
            // So you get concurrent download and processing
            // with a safe handover via the BlockingCollection.
        }
    });
    
    await Task.WhenAll(producer, consumer);
    

    Now you can have your cake and eat it too: the processing happens in parallel as the records are yielded by GetData, and awaiting the producer task propagates any exceptions, whereas calling CompleteAdding inside the finally ensures that your consumer does not get stuck in a blocking state indefinitely.

    Since you're working with C++ the above is still applicable to an extent (that is, the right thing to do would be to reimplement the pipeline in C++), but the implementation may not be so pretty, and the way you've gone with ,your answer may very well be the preferred solution even if it does feel like a hack due to the unobserved task. I can't really think of a scenario where it would actually go wrong since CompleteAdding always gets called due to the newly introduced try/catch.

    Obviously another solution would be to move the processing code to your C# project, which this may or may not be possible depending on your architecture.