I have a question on Observables (which I posted on the publishers sub forum for this book but I am still waiting on any response).
I use the helper methods provided as is the standard practice rather than handcrafting the observables. However just out of academic interest I did see into what it takes to handcraft an observable.
I saw an implementation in a book where at the end of subscribe method Disposable.Empty was returned. The code is somewhat like below.
public class MyObservable : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(1000);
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
}
}
If I want to return a proper Disposable which will actually lead to unsubscribing when Dispose is called what should be the way?
I had a crack at it using this for the Observable and this for Observer
I had to introduce a subscription handler
public class SubscriptionHandler : IDisposable
{
private readonly List<IObserver<int>> _listOfObservers;
private readonly IObserver<int> _currentObserver;
public SubscriptionHandler(List<IObserver<int>> currentListOfObservers, IObserver<int> currentObserver)
{
_listOfObservers = currentListOfObservers;
_currentObserver = currentObserver;
}
public void Dispose()
{
if (_currentObserver != null && _listOfObservers.Contains(_currentObserver))
{
_listOfObservers.Remove(_currentObserver);
}
}
}
This is the code for the Observable
public class MyObservable : IObservable<int>
{
private List<IObserver<int>> _listOfSubscribedObservers = new List<IObserver<int>>();
public IDisposable Subscribe(IObserver<int> observer)
{
if (!_listOfSubscribedObservers.Contains(observer))
{
_listOfSubscribedObservers.Add(observer);
}
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(1000);
observer.OnNext(i);
}
observer.OnCompleted();
});
return new SubscriptionHandler(_listOfSubscribedObservers, observer);
}
}
I have a feeling that I am missing something. There has to be a built in way to return a meaningful Disposable for handcrafted Observable or this is something which comes only with Observable create helper methods?
I should make clear that all of this is a demonstration of Rx design internals. You can have a look at classes AnonymousObservable<T>
,
AnonymousObserver<T>
, and AnonymousDisposable
, which is how the framework does it. Pretty straight forward. However, you should almost never use any of this code, rather use things like Disposable.Create
and Observable.Create
. If you're implementing an IObservable
, you're almost definitely doing it wrong.
Here's the basic idea: The observable needs to produce an IDisposable
which removes the relevant observer from the observable's internal list of observers. Your code is (wrongly) removing all observers from the internal list.
Here's a basic disposable which makes it easy to create functionally. With this code, GenericDisposable.Create
is the same as Disposable.Create(Action a)
.
public class GenericDisposable : IDisposable
{
public static IDisposable Create(Action disposeAction)
{
return new GenericDisposable(disposeAction);
}
private readonly Action _disposeAction;
public GenericDisposable(Action disposeAction)
{
_disposeAction = disposeAction;
}
public void Dispose()
{
_disposeAction();
}
}
...and here's an example observable implementation:
public class SendIntMessages : IObservable<int>
{
private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();
protected void OnNext(int i)
{
foreach (var o in _observers)
o.OnNext(i);
}
protected void OnError(Exception e)
{
foreach (var o in _observers)
o.OnError(e);
}
protected void OnCompleted()
{
foreach (var o in _observers)
o.OnCompleted();
}
public void SendIntMessage(int i)
{
OnNext(i);
}
public void EndStream()
{
OnCompleted();
}
public void SendError(Exception e)
{
OnError(e);
}
public IDisposable Subscribe(IObserver<int> observer)
{
_observers.Add(observer);
return GenericDisposable.Create(() => _observers.Remove(observer));
}
}
This is a long-running, hot observable. It keeps track of its observers, and the disposable unsubscribes them.
Consider in contrast this observable:
public class CountTo5 : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnNext(4);
observer.OnNext(5);
return GenericDisposable.Create(() => {});
}
}
This is a 'cold' observable that runs immediately. There's no way to unsubscribe in the middle: By the time you get the disposable, the observable has concluded.
Disposable.Empty
is a simple short hand for DisposableCreate(() => {})
.