I have a class that has to perform setup/teardown to be able to produce events that get wrapped as observables. For brevity, I used Observable.Interval to produce an observable stream in the code below. What changes do I need in this code to get Start and Stop to be called twice? Effectively Start would need to be called each time the refcount increments to 1, and Stop called each time refcount decrements to 0.
static async Task Main(string[] args)
{
var provider = new SomeProvider();
var s1 = provider.Subscribe(on1);
var s2 = provider.Subscribe(on2);
await Task.Delay(5000);
s1.Dispose();
s2.Dispose();
var s3 = provider.Subscribe(on1);
var s4 = provider.Subscribe(on2);
await Task.Delay(5000);
s3.Dispose();
s4.Dispose();
}
private static void on1(long obj) => Console.WriteLine("on1");
private static void on2(long obj) => Console.WriteLine("on2");
public class SomeProvider : IObservable<long>
{
//Example observable stream
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).Do(Start).Publish().RefCount();
public IDisposable Subscribe(IObserver<long> observer)
{
var subscription = timer.Subscribe(observer);
return new CompositeDisposable(subscription, Disposable.Create(() => { Stop(); }));
}
private static void Start(long obj) => Console.WriteLine("STARTED");
private void Stop() => Console.WriteLine("STOPPED");
}
With the help of the previous answers, I created this extension method:
public static IObservable<T> OnSubscriptionDo<T>(this IObservable<T> source, Action subscribeAction, Action unsubscribeAction = null) =>
(null == unsubscribeAction) ?
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Publish().RefCount() :
source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Finally(unsubscribeAction).Publish().RefCount();
The "Do" is to provide some indication that this method introduces side effects. The observable stream is then defined like so:
readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).OnSubscriptionDo(Start, Stop);