I have a very simple IObservable<int>
that acts as a pulse generator every 500ms:
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))
And I have a CancellationTokenSource
(that is used to cancel other work that is going on simultaneously).
How can I use the cancellation token source to cancel my observable sequence?
If you're using the GenerateWithTime (replaced now with Generate passing in a timespan func overload), you can replace the second parameter to evaulate the state of the cancellation token as follows:
var pulses = Observable.Generate(0,
i => !ts.IsCancellationRequested,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(500));
Alternatively, if your event which causes the cancellation token to be set can be converted to an observable itself, you could use something like the following:
pulses.TakeUntil(CancelRequested);
I posted a more detailed explanation at http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable as well.