Search code examples
c#.netsystem.reactive

Rx - Multiple consumers without losing messages


I want multiple consumers and each of them should get all the messages (without missing any of them).

The actual is idea is that I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.

How do I achieve that with System.Reactive? Will the following work?

var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
    var publishVal = x;
    Console.WriteLine($@"observer1 publishing {publishVal}");
    return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs1.Subscribe(x => Console.WriteLine($@"subscriber2 value {x}"));

obs1.Connect();

Solution

  • From what you've described, this will work:

    var obs1 =
        Observable
            .Interval(TimeSpan.FromMilliseconds(500))
            .Do(x => Console.WriteLine($@"observer1 publishing {x}"));
    
    var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
    var sub2 = obs1.Subscribe(x => Console.WriteLine($@"subscriber2 value {x}"));