Search code examples
c#system.reactiverx.net

Executing Task based methods in Observable chain => IObservable<IObservable<Unit>>


I have a lot of code that is reactive but needs to call into Task based methods.

For example, in this snippet PracticeIdChanged is an IObservable. When PracticeIdChanged fires, I want the system to react by reloading some stuff and have code that looks like this:

 PracticeIdChanged.Subscribe(async x => {
            SelectedChargeInfo.Item = null;
            await LoadAsync().ConfigureAwait(false);
        });

Although it seems to work ok, I get warnings about executing async code in the Subscribe. Additionally, I consider this to be a code smell as I am mixing two separate threading models which I think may come to bite me later.

Refactoring like this works (even) with no combination methods like .Merge(), .Switch() or .Concat()

PracticeIdChanged
            .Do(_ => SelectedChargeInfo.Item = null)
            .Select(_ => LoadAsync().ToObservable())
            .Subscribe();

When PracticeIdChanged fires the LoadAsync method executes

The Select results in an IObservable<IObservable> which looks odd. Is this ok or does it require some combination function like .Merge or .Switch()

In many places, I use SelectMany to execute the Task based method but it requires returning Task which would require changing the signature of the Task based method in the example above which I do not want to do.


Solution

  • It depends on what kind of notifications you expect to get from the resulting sequence, and what kind of behavior you want in case of errors. In your example you .Subscribe() to the sequence without passing any handler whatsoever (onNext/onError/onCompleted), indicating that you are not interested to be notified for anything. You don't care about the completion of the asynchronous operations, all of them becoming essentially fired-and-forgotten. Also a failure of one asynchronous operation will have no consequence to the rest: the already started asynchronous operations will continue running (they won't get canceled), and starting new asynchronous operations will not be impeded. Finally a failure of the source sequence (PracticeIdChanged) will result in an unhandled exception, that will crash the process. If that's the behavior that you want, then your current setup is what you need.

    For comparison, let's consider this setup:

    await PracticeIdChanged
        .Do(_ => SelectedChargeInfo.Item = null)
        .Select(_ => Observable.FromAsync(ct => LoadAsync(ct)))
        .Merge()
        .DefaultIfEmpty();
    

    This setup assumes that the LoadAsync method has a CancellationToken parameter. The resulting sequence is awaited. The await will complete when all LoadAsync operations have completed, or any one of them has failed, or if the source sequence has failed. In case of failure, all currently running asynchronous operations will receive a cancellation signal, so that they can bail out quickly. The await will not wait for their completion though. Only the first error that occurred will be propagated as an exception. This exception can be handled by wrapping the await in a try/catch block. There is no possibility for an uncatchable, process-crashing, unhandled exception.

    The purpose of the DefaultIfEmpty at the end of the chain is to prevent an InvalidOperationException, in case the source sequence emits zero elements. It's a workaround for this strange "feature" of empty observable sequences, to throw when waited synchronously or asynchronously.