Search code examples
c#system.reactivecancellation

How to cancel an observable sequence


I have a very simple IObservable<int> that acts as a pulse generator every 500ms:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

And I have a CancellationTokenSource (that is used to cancel other work that is going on simultaneously).

How can I use the cancellation token source to cancel my observable sequence?


Solution

  • If you're using the GenerateWithTime (replaced now with Generate passing in a timespan func overload), you can replace the second parameter to evaulate the state of the cancellation token as follows:

    var pulses = Observable.Generate(0,
        i => !ts.IsCancellationRequested,
        i => i + 1,
        i => i,
        i => TimeSpan.FromMilliseconds(500));
    

    Alternatively, if your event which causes the cancellation token to be set can be converted to an observable itself, you could use something like the following:

    pulses.TakeUntil(CancelRequested);
    

    I posted a more detailed explanation at http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable as well.