Search code examples
c#rabbitmqreactive-programmingsystem.reactiveeasynetq

Turning a EasynetQ subscriber into an observable


I am using EasyNetQ as client library when using RabbitMQ. To create a subscriber one do like

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

When an instance of MyMessage is published, EasyNetQ will call the delegate and print the message’s Text property to the console.

How do I turn this into an observable sequence? I have been looking at the Observable.Create and Observable.Generate methods but I cannot figure out how to bridge a RabbitMQ consumer and an observable sequence.

The Subscribe method returns an IDisposable so a solution should respect this such that the resource can be properly disposed.

I have noticed this solution but most people seem to advice against using Subject so I would like to find another solution.

Any hints or ideas are most welcome.


Solution

  • Does this work?

    var observable = Observable.Create<MyMessage>(o => 
        bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
    );
    

    I don't have a problem with the way Subject is used in the referenced code: If Subject is a private (ideally read-only) field, only exposed as an IObservable then its impact is contained. However there's other problems with the code at that referenced link:

    • If there's multiple .Connect() calls, then there will be multiple subscriptions, only one of which will be disposed. This could lead to leaks (don't know how EasyNetQ subscriptions are implemented).

    • Best not to implement IObservable (or IConnectableObservable), because it's easy to make mistakes like the above.