Search code examples
c#eventssystem.reactivereactive-programmingcancellation

Cancel RX.Net Observer's ongoing OnNext methods


As described in my original question (see Correlate interdependent Event Streams with RX.Net) I have an RX.net event stream that shall only call the observer's OnNext method as long as a certain other event is not triggered (basically 'Handle Change-* Events as long as the system is connected, pause while disconnected and re-start handling of the Change-* events once the system has re-connected).

However, while this works smoothly with new events, how would I cancel / signal cancellation to ongoing .OnNext() calls?


Solution

  • Since your observer is already written to accept a CancellationToken, we can just modify your Rx stream to supply one along with the event data. We'll use the Rx CancellationDisposable that we will dispose of whenever the stream is unsubscribed.

    // Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
    public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
    {
        return Observable.Using(
            () => new CancellationDisposable(),
            cts => source.Select(item => Tuple.Create(cts.Token, item)));
    }
    

    Putting this together with the solution from the other question:

    DataSourceLoaded
        .SelectMany(_ => DataSourceFieldChanged
            .Throttle(x)
            .CancelOnUnsubscribe()
            .TakeUntil(DataSourceLoaded))
        .Subscribe(c => handler(c.Item1, c.Item2));
    

    When the TakeUntil clause is triggered, it will unsubscribe from the CancelOnUnsubscribe observable, which will in turn dispose of the CancellationDisposable and cause the token to be cancelled. Your observer can watch this token and stop its work when this happens.