I am using EasyNetQ as client library when using RabbitMQ. To create a subscriber one do like
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
When an instance of MyMessage is published, EasyNetQ will call the delegate and print the message’s Text property to the console.
How do I turn this into an observable sequence? I have been looking at the Observable.Create
and Observable.Generate
methods but I cannot figure out how to bridge a RabbitMQ consumer and an observable sequence.
The Subscribe
method returns an IDisposable
so a solution should respect this such that the resource can be properly disposed.
I have noticed this solution but most people seem to advice against using Subject
so I would like to find another solution.
Any hints or ideas are most welcome.
Does this work?
var observable = Observable.Create<MyMessage>(o =>
bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
);
I don't have a problem with the way Subject
is used in the referenced code: If Subject
is a private (ideally read-only) field, only exposed as an IObservable
then its impact is contained. However there's other problems with the code at that referenced link:
If there's multiple .Connect()
calls, then there will be multiple subscriptions, only one of which will be disposed. This could lead to leaks (don't know how EasyNetQ subscriptions are implemented).
Best not to implement IObservable
(or IConnectableObservable
), because it's easy to make mistakes like the above.