Search code examples
c#system.reactiveobservableidisposablesubject-observer

Reactive Observable Subscription: Stop Subscription and Renew Subscription


I have a Observable from BlockCollection that i use like a queue

IObservable<ProcessHoldTransactionData> GetObservable()
{     
    _queue.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default);
}

and subscribe to him:

void StartSubscription()
{
    _subscription = =  GetObservable().Subscribe(
                data => OnNextSubscribe(data),
                ex => _logger.Error("Error"),
                () => _logger.Warn("Complete"));
}

now I have another Observable:

var timer = Observable.Interval(TimeSpan.FromSeconds(60));
_subscriptionTimer = timer.Subscribe(tick =>
{
    OnTimerNextSubscribe();
});

I would like when the OnTimerNextSubscribe start to STOP the subscribe of _subscription and renew it when the OnTimerNextSubscribe finish.

What the best paractice to that?
Should I dispose the _subscription and call StartSubscription()


Solution

  • There's basically two alternatives: One is to dispose then restart, the other is to create some sort of on/off signal observable, then filter _subscription accordingly:

    void StartSubscription(Observable<bool> onOffSignal)
    {
        _subscription = =  GetObservable()
            .WithLatestFrom(onOffSignal, (s, b) => b ? Observable.Return(s) : Observable.Empty(s))
            .Merge()
            .Subscribe(
                    data => OnNextSubscribe(data),
                    ex => _logger.Error("Error"),
                    () => _logger.Warn("Complete")
            );
    }