I am investigating a complex issue where for some reason a BehaviorSubject
emits an error to observers.
That happens even though nobody is actually interacting with that observable.
The only connection seems to be that they both evaluated on the UI thread of my WPF application.
This is somewhat representative of how my code looks like, but the actual issue is likely missing from it:
private readonly BehaviorSubject<int> _exampleSubject;
public void NotifyChanged(int newValue)
{
_exampleSubject.OnNext(newValue);
}
public IObservable<int> _ConstructStream()
{
return _exampleSubject
.Do(
onNext: count => Console.WriteLine($"_exampleSubject.OnNext: {count} (thread: {Thread.CurrentThread.ManagedThreadId})"),
onError: exception => Console.WriteLine($"_exampleSubject.OnError: {exception} (thread: {Thread.CurrentThread.ManagedThreadId})"),
onCompleted: () => Console.WriteLine($"_exampleSubject.OnCompleted (thread: {Thread.CurrentThread.ManagedThreadId})"));
}
For some reason, the _exampleSubject
emits an exception, which makes no sense because OnError
is never called anywhere.
In another location, an exception isn't handled properly in an observable which seems to interfere with it in some way.
Unfortunately, I wasn't able to create a minimal example to reproduce this issues. There seems to be some race condition that only occurs if all the code included.
I've managed to reproduce part of the issue, which is discussed below.
To reproduce the issue use the default WPF template and make the following modifications to App.xaml.cs
:
// Target framework:
// net7.0-windows
// Packages:
// System.Reactive 6.0.0
namespace WpfApp1
{
public partial class App : Application
{
protected override void OnStartup(StartupEventArgs e)
{
base.OnStartup(e);
// ... Call the example methods here.
}
// ... Define the example methods here.
}
}
private async Task _Example_2()
{
var dispatcherSynchronizationContext = new DispatcherSynchronizationContext(Application.Current.Dispatcher);
// Adding this code would no longer reproduce the issue:
// using var disposable = Observable
var disposable = Observable
.Timer(TimeSpan.FromMilliseconds(100))
.ObserveOn(dispatcherSynchronizationContext)
.SubscribeOn(dispatcherSynchronizationContext)
.Subscribe(_ => throw new InvalidOperationException());
// Wait a bit for things to play out.
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
When the exception is thrown, that seems to freeze the main thread, the entire window freezes and anything else on the same scheduler stops being processed. Other observables that are not bound to the same scheduler, continue briefly before the application closes automatically.
This is sort of what I would expect, because an unhanded exception should terminate the application. Unfortunately, there are no log messages anywhere to indicate that this happened.
Problem 1: The odd thing here is that if I add a using
statement to the disposable, the issue no longer occurs. I would expect that the disposal would occur at the end of the function, after waiting for the delay, but it seems to occur before that.
Looking at this again, I got confused with my own example. After 300ms, it will return from the method which makes the dispose call. In my head I was thinking about it more like Timeout.InfiniteTimeSpan
.
Interestingly, the same issue can be reproduced with Observable.FromAsync
with Concat
, but only if there is no delay before the switch occurs.
private async Task _Example_1()
{
var dispatcherSynchronizationContext = new DispatcherSynchronizationContext(Application.Current.Dispatcher);
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.ObserveOn(dispatcherSynchronizationContext)
.SubscribeOn(dispatcherSynchronizationContext)
.Select(index => Observable.FromAsync(async () =>
{
// Adding this code would no longer reproduce the issue:
// await Task.Delay(TimeSpan.FromMilliseconds(100));
throw new InvalidOperationException();
}))
.Concat()
.Subscribe();
// Wait a bit for things to play out.
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
Problem 2: It seems, that there is something weird happening when it switches the thread and pauses execution. That seems to change the behavior. Adding a delay should still cause the dispatcher to die.
I was able to resolve my specific issue, but I still don't understand why the exception was attributed to the BehaviorSubject
. I still don't quite get it, but this seems to resolve the "problem" I was having.
I don't understand every small detail of what happens, but my understanding is as follows:
FromAsync
is essentially a wrapper for StartAsync
that defers the execution of the task until a subscription occurs.
The implementation of StartAsync
will start your task and then use Task.ToObservable()
and returns that.
The implementation of ToObservable
will eventually run the following code:
if (_scheduler == null)
{
_task.ContinueWith(
static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!),
observer,
cts.Token,
options,
TaskScheduler.Current);
}
The TaskScheduler.Current
means that the continuation will run on the task scheduler instead of the current synchronization context.
This program reproduces the issue perfectly:
internal class Program
{
public static async Task Main()
{
Dispatcher.CurrentDispatcher.InvokeAsync(MainOnDispatcher);
Dispatcher.Run();
}
public static void MainOnDispatcher()
{
var triggerObservable = new[] { 1 }.ToObservable();
triggerObservable
.ObserveOnDispatcher()
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before from async"))
.Select(_ => Observable.FromAsync(async () => await ExampleAsync()))
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after from async"))
.Concat()
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after concat"))
.Subscribe();
}
private static async Task ExampleAsync()
{
Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) about to wait");
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) done waiting");
}
}
(thread: 1) before from async
(thread: 1) after from async
(thread: 1) about to wait
(thread: 1) done waiting
(thread: 4) after concat
Notice how the thread is switched when you consume the result of FromAsync
, but all the code inside was run on the main thread. I am unsure if this is the intended behavior or simply an implementation issue. Therefore, I created an issue on GitHub.
If you remove the delay from it, the behaviour changes, because it will use the ImmediateScheduler
by default (then _scheduler
is not null
). That's what made this even more confusing.
That's pretty much what caused the issue for me. For some unknown reason the exception was actually attributed to the wrong observable, and my debugger stopped at the BehaviorSubject
discussed above. I am still unsure what caused that issue.