Search code examples
c#.netsystem.threading.channels

Multiple consumers without losing messages


I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.

I want multiple consumer. And each of them should get all the messages.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = false
});
var cts = new CancellationTokenSource();


var producer = Task.Run(async () =>
{
    int i = 0;
    while (!cts.IsCancellationRequested)
    {
        channel.Writer.TryWrite(i++);

        await Task.Delay(TimeSpan.FromMilliseconds(250));
    }
});

var readerOneTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader one: {i}");
    }
});

var readerTwoTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader two: {i}");
    }
});

cts.CancelAfter(TimeSpan.FromSeconds(5));

Console.ReadLine();

Solution

  • A single Channel<T> cannot broadcast messages to multiple consumers. Every time a message is read from the channel, the message is consumed, and no other consumer is going to get it. If you want to broadcast all messages to all consumers, you'll have to create one dedicated Channel<T> per consumer.

    You might find this question interesting: Factory for IAsyncEnumerable or IAsyncEnumerator. It shows various ways to implement a source/controller for an IAsyncEnumerable<T> sequence, that include channels and Rx subjects.


    Update: Below is a demo of how you could use multiple channels, in order to propagate all the messages to all the consumers.

    List<Channel<int>> channels = new();
    
    async Task CreateConsumer(Func<Channel<int>, Task> body)
    {
        var channel = Channel.CreateUnbounded<int>();
        lock (channels) channels.Add(channel);
        try
        {
            await Task.Run(() => body(channel)).ConfigureAwait(false);
        }
        finally
        {
            lock (channels) channels.Remove(channel);
        }
    }
    
    Task consumer1 = CreateConsumer(async channel =>
    {
        await foreach (var i in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"Consumer one: {i}");
        }
    });
    
    Task consumer2 = CreateConsumer(async channel =>
    {
        await foreach (var i in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"Consumer two: {i}");
        }
    });
    
    using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
    Task producer = Task.Run(async () =>
    {
        int i = 0;
        while (true)
        {
            i++;
            lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
            try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
            catch (OperationCanceledException) { break; }
        }
    });
    
    try { producer.Wait(); } catch { }
    lock (channels) channels.ForEach(channel => channel.Writer.Complete());
    Task.WaitAll(producer, consumer1, consumer2);
    

    Try it on Fiddle.

    The CreateConsumer is an asynchronous method that is responsible for creating the channel and adding it in the list. It is also responsible for removing the channel from the list when the consumer completes. This is important, otherwise in case a consumer fails the producer would continue pushing messages in the dead channel, resulting in a memory leak.

    The "body" of the consumer, that can be different for each consumer, is passed as an asynchronous lambda to the CreateConsumer method.

    It is important that all consumers are started, and their channels are created, before starting the producer. That's why the CreateConsumer method is not wrapped in a Task.Run. This way the code inside the CreateConsumer until the first await runs synchronously on the same thread that invoked the CreateConsumer.

    Every access to the list with the channels is protected with a lock, because it is possible that multiple threads might attempt to read/modify the list at the same time.