I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken
, which effectively stops the subscription.
When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.
(Edit: Added expected output)
Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvent
s, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:
You want to do other things between subscribing to the observable and awaiting it to complete (Console.ReadLine()
etc).
You want to terminate the observable when a CancellationToken
is canceled.
Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
var withCancellation = observable
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() => observer.OnNext(default))));
var withSideEffectsAndCancellation = withCancellation
.Do(value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
}, () => Console.WriteLine("Completed"));
var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
.Publish()
.AutoConnect(0);
Console.ReadLine();
cts.Cancel();
hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");
Explanation:
The .TakeUntil...cts.Token.Register...
is an idiomatic way to unsubscribe instantly from the Interval
observable, when the cts.Token
is canceled. It is copy-pasted from a relevant question. You could also use the simpler .TakeWhile(x => !cts.Token.IsCancellationRequested)
, provided that you are OK with a slightly less responsive cancellation.
The Do
operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe
method.
The .Publish().AutoConnect(0);
makes the sequence hot right away. The AutoConnect
operator offers no way to disconnect from the underlying observable (as opposed to the RefCount
operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken
that we attached previously.
The .DefaultIfEmpty()
before the .Wait()
is required in order to prevent an InvalidOperationException
in the edge case that the sequence is canceled before producing any element. It is also required if you await
asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync
and the ToTask
operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.