Search code examples
c#system.reactivecancellation

How is an observable subscription gracefully terminated?


I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken, which effectively stops the subscription.

When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?

Example

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

Output

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.

Expected Output

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.

(Edit: Added expected output)


Solution

  • Observables are (a)waitable. Subscriptions to observables are not awaitable. So if you want to wait your subscription code to complete, without resorting to artificial solutions like using ManualResetEvents, you should make your subscription code a side-effect of a derived observable, and (a)wait that observable. The example presented in your question has additional requirements, that complicate matters a bit, but not that much:

    1. You want to do other things between subscribing to the observable and awaiting it to complete (Console.ReadLine() etc).

    2. You want to terminate the observable when a CancellationToken is canceled.

    Below is an example of how to address these requirements. It shows just one of the many available ways to solve this problem:

    var cts = new CancellationTokenSource();
    cts.Token.Register(() => Console.WriteLine("Token cancelled."));
    
    var observable = Observable
        .Interval(TimeSpan.FromMilliseconds(250));
    
    var withCancellation = observable
        .TakeUntil(Observable.Create<Unit>(observer =>
            cts.Token.Register(() => observer.OnNext(default))));
    
    var withSideEffectsAndCancellation = withCancellation
        .Do(value =>
        {
            Console.WriteLine(value);
            Thread.Sleep(500);
    
            if (cts.Token.IsCancellationRequested)
            {
                Console.WriteLine("Cancellation detected on {0}.", value);
                Thread.Sleep(500);
                Console.WriteLine("Cleaning up done for {0}.", value);
            }
        }, () => Console.WriteLine("Completed"));
    
    var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
        .Publish()
        .AutoConnect(0);
    
    Console.ReadLine();
    cts.Cancel();
    
    hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
    // or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
    Console.WriteLine("Job terminated.");
    

    Explanation:

    1. The .TakeUntil...cts.Token.Register... is an idiomatic way to unsubscribe instantly from the Interval observable, when the cts.Token is canceled. It is copy-pasted from a relevant question. You could also use the simpler .TakeWhile(x => !cts.Token.IsCancellationRequested), provided that you are OK with a slightly less responsive cancellation.

    2. The Do operator is a natural way to perform the subscription side-effects, because it has the same parameters with the Subscribe method.

    3. The .Publish().AutoConnect(0); makes the sequence hot right away. The AutoConnect operator offers no way to disconnect from the underlying observable (as opposed to the RefCount operator), but in this particular case the disconnect functionality is not needed. The lifetime of the underlying observable is already controlled by the CancellationToken that we attached previously.

    4. The .DefaultIfEmpty() before the .Wait() is required in order to prevent an InvalidOperationException in the edge case that the sequence is canceled before producing any element. It is also required if you await asynchronously the sequence. These mechanisms to wait an observable (as well as others like the RunAsync and the ToTask operators) are returning the last value emitted by the observable, and they become frustrated when no such value exists.