Search code examples
c#async-awaittaskproducer-consumer

Infinite producer/consumer via serial port data


I'm currently reading in data via a SerialPort connection in an asynchronous Task in a console application that will theoretically run forever (always picking up new serial data as it comes in).

I have a separate Task that is responsible for pulling that serial data out of a HashSet type that gets populated from my "producer" task above and then it makes an API request with it. Since the "producer" will run forever, I need the "consumer" task to run forever as well to process it.

Here's a contrived example:

TagItems = new HashSet<Tag>();
Sem = new SemaphoreSlim(1, 1);
SerialPort = new SerialPort("COM3", 115200, Parity.None, 8, StopBits.One);
// serialport settings...

try
{
  var producer = StartProducerAsync(cancellationToken);
  var consumer = StartConsumerAsync(cancellationToken);

  await producer; // this feels weird
  await consumer; // this feels weird
}
catch (Exception e)
{
  Console.WriteLine(e); // when I manually throw an error in the consumer, this never triggers for some reason
}

Here's the producer / consumer methods:

private async Task StartProducerAsync(CancellationToken cancellationToken)
{
    using var reader = new StreamReader(SerialPort.BaseStream);
    while (SerialPort.IsOpen)
    {
        var readData = await reader.ReadLineAsync()
            .WaitAsync(cancellationToken)
            .ConfigureAwait(false);

        var tag = new Tag {Data = readData};
        await Sem.WaitAsync(cancellationToken);
        TagItems.Add(tag);
        Sem.Release();

        await Task.Delay(100, cancellationToken);
    }

    reader.Close();
}

private async Task StartConsumerAsync(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        await Sem.WaitAsync(cancellationToken);
        if (TagItems.Any())
        {
            foreach (var item in TagItems)
            {
                await SendTagAsync(tag, cancellationToken);
            }
        }

        Sem.Release();
        await Task.Delay(1000, cancellationToken);
    }
}

I think there are multiple problems with my solution but I'm not quite sure how to make it better. For instance, I want my "data" to be unique so I'm using a HashSet, but that data type isn't concurrent-friendly so I'm having to lock with a SemaphoreSlim which I'm guessing could present performance issues with large amounts of data flowing through.

I'm also not sure why my catch block never triggers when an exception is thrown in my StartConsumerAsync method.

Finally, are there better / more modern patterns I can be using to solve this same problem in a better way? I noticed that Channels might be an option but a lot of producer/consumer examples I've seen start with a producer having a fixed number of items that it has to "produce", whereas in my example the producer needs to stay alive forever and potentially produces infinitely.


Solution

  • First things first, starting multiple asynchronous operations and awaiting them one by one is wrong:

    // Wrong
    await producer;
    await consumer;
    

    The reason is that if the first operation fails, the second operation will become fire-and-forget. And allowing tasks to escape your supervision and continue running unattended, can only contribute to your program's instability. Nothing good can come out from that.

    // Correct
    await Task.WhenAll(producer, consumer)
    

    Now regarding your main issue, which is how to make sure that a failure in one task will cause the timely completion of the other task. My suggestion is to hook the failure of each task with the cancellation of a CancellationTokenSource. In addition, both tasks should watch the associated CancellationToken, and complete cooperatively as soon as possible after they receive a cancellation signal.

    var cts = new CancellationTokenSource();
    Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
    Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
    await Task.WhenAll(producer, consumer)
    

    Here is the OnErrorCancel extension method:

    public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
    {
        return task.ContinueWith(t =>
        {
            if (t.IsFaulted) cts.Cancel();
            return t;
        }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
    }
    

    Instead of doing this, you can also just add an all-enclosing try/catch block inside each task, and call cts.Cancel() in the catch.