In the following code from the answer of What's a good way to run periodic tasks using Rx, with a single concurrent execution restriction?,
void Main()
{
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
{
Console.ReadLine();
}
}
private void DoSomething()
{
Console.Write("<");
Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
Thread.Sleep(1000);
Console.WriteLine(">");
}
I'm trying to add cancellation
and test stopping the program after five seconds.
using System.Reactive.Linq;
Task DoSomething(CancellationToken cancellationToken=default)
{
if (cancellationToken.IsCancellationRequested) { return Task.CompletedTask; }
Console.Write("<");
Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
Thread.Sleep(1000);
Console.WriteLine(">");
return Task.CompletedTask;
}
async Task WaitAndThenCancel(int seconds, CancellationTokenSource cancellationTokenSource)
{
await Task.Delay(seconds*1000);
Console.WriteLine("Cancelling...");
cancellationTokenSource.Cancel();
}
void Main(CancellationToken cancellationToken=default)
{
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
using (timer. Do(x =>
{
if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Canceled - Main");
return; // Need to stop the stream here
}
Console.WriteLine("!");
}).Subscribe(async tick => await DoSomething(cancellationToken)))
{
Console.ReadLine();
}
}
var ct = new CancellationTokenSource();
WaitAndThenCancel(5, ct);
Main(ct.Token);
I expect the code to print the current time for N seconds, and then print "Canceled - Main" and stop. However, it starts to print "Canceled - Main" after N seconds and never stop?
!
<15:00:23.823>
!
<15:00:24.836>
!
<15:00:25.853>
!
<15:00:26.860>
!
<15:00:27.863Cancelling...
>
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
....
Using TakeUntil()
.
using System.Reactive;
using System.Reactive.Linq;
async Task DoSomething(CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
return; // Task.CompletedTask;
}
Console.Write("<");
Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
await Task.Delay(1000); // Thread.Sleep(1000);
Console.WriteLine(">");
}
async Task Main3(CancellationToken cancellationToken = default)
{
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
var cancel = Observable.Create<Unit>(observer => cancellationToken.Register(() => {
// observer.OnNext(default);
observer.OnCompleted(); }));
using (timer.Do(x =>
{
if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Canceled - Main");
return;
}
Console.WriteLine("do!");
})
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.TakeUntil(cancel)
.Select(_ => Observable.FromAsync(() => DoSomething(cancellationToken)))
.Concat()
.Subscribe())
{
Console.WriteLine("Will wait for timed cancelation here.");
try
{
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
Console.WriteLine($">{Environment.NewLine}Canceled - Main. In Using");
}
}
}
var ct = new CancellationTokenSource();
ct.CancelAfter(5000);
await Main3(ct.Token);
If you want to run an observable and stop it after a set time interval then you should simply use .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
var subscription =
timer
.Do(x => Console.WriteLine("!"))
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.Subscribe(tick => DoSomething());
using (subscription)
{
Console.ReadLine();
}
If you want to use a CancellationToken
then you could use this:
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken ct = cts.Token;
var cancel = Observable.Create<Unit>(observer => cts.Token.Register(() => { observer.OnNext(default); observer.OnCompleted(); }));
var subscription =
timer
.Do(x => Console.WriteLine("!"))
.TakeUntil(cancel)
.Subscribe(tick => DoSomething());
using (subscription)
{
await Task.Delay(TimeSpan.FromSeconds(5.0));
cts.Cancel();
Console.ReadLine();
}