Search code examples
c#wpfsystem.reactivereactiveuirx.net

Subscribe on observable not firing when using an async extension method


For a new project I'm using ReactiveUI. I'm starting to really like the ideas behind reactive programming, but still have some trouble wrapping my head around some of the concepts and writing idiomatic code.

In this example, I've got a very basic MainWindow with a TextBox named "UserId":

<Window
    x:Class="ReactiveUiDemo.MainWindow"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    Title="MainWindow"
    Width="800"
    Height="450">
    <StackPanel>
        <TextBox x:Name="UserId" />
    </StackPanel>
</Window>

The TextBox is bound to a property on the corresponding viewmodel in the constructor of the MainWindow:

using System.Reactive.Disposables;
using System.Windows;
using ReactiveUI;

namespace ReactiveUiDemo
{
    public partial class MainWindow : Window, IViewFor<MainWindowViewModel>
    {
        public MainWindowViewModel ViewModel { get; set; }

        object IViewFor.ViewModel
        {
            get => ViewModel;
            set => ViewModel = (MainWindowViewModel)value;
        }

        public MainWindow()
        {
            InitializeComponent();
            ViewModel = new MainWindowViewModel();

            this.WhenActivated(disposables =>
            {
                this
                    .Bind(ViewModel, vm => vm.UserId, v => v.UserId.Text)
                    .DisposeWith(disposables);
            });
        }
    }
}

The ViewModel behind this is then observing this property (to which the ReactiveUI.Fody [Reactive] attribute is applied). When 4 digits have been entered it tries to look up a user, which either succeeds in case of UserId 1234 or fails. This result is then shown using a MessageBox.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Windows;
using ReactiveUI;
using ReactiveUI.Fody.Helpers;

namespace ReactiveUiDemo
{
    public sealed class MainWindowViewModel : ReactiveObject
    {
        [Reactive]
        public string UserId { get; private set; }

        public MainWindowViewModel()
        {
            this
                .WhenAnyValue(t => t.UserId)
                .Where(u => (u?.Length ?? 0) == 4)
                .Where(u => int.TryParse(u, out _))
                .Select(i => int.Parse(i))
                .Select(i => GetUserName(i))
                .Match(
                    userName => DisplaySuccess(userName),
                    failure => DisplayError(failure))
                .Do(_ => UserId = string.Empty)
                .Subscribe(
                    _ => MessageBox.Show("OnNext"),
                    _ => MessageBox.Show("OnError"),
                    () => MessageBox.Show("OnCompleted"));
        }

        private enum Failure { UserNotFound }

        private Result<string, Failure> GetUserName(int userId)
        {
            if (userId == 1234)
                return "Waldo";

            return Failure.UserNotFound;
        }

        private async Task<Unit> DisplayError(Failure failure)
        {
            MessageBox.Show($"Error: {failure}.");
            await Task.CompletedTask;

            return Unit.Default;
        }

        private async Task<Unit> DisplaySuccess(string userName)
        {
            MessageBox.Show($"Found {userName}!");
            await Task.CompletedTask;

            return Unit.Default;
        }
    }
}

The class "Result" (or this stripped version of it) contains either a TSuccess or TFailure:

using System;

namespace ReactiveUiDemo
{
    public sealed class Result<TSuccess, TFailure>
    {
        private readonly bool _isSuccess;
        private readonly TSuccess _success;
        private readonly TFailure _failure;

        private Result(TSuccess value)
        {
            _isSuccess = true;
            _success = value;
            _failure = default;
        }

        private Result(TFailure value)
        {
            _isSuccess = false;
            _success = default;
            _failure = value;
        }

        public TResult Match<TResult>(Func<TSuccess, TResult> successFunc, Func<TFailure, TResult> failureFunc)
            => _isSuccess ? successFunc(_success) : failureFunc(_failure);

        public static implicit operator Result<TSuccess, TFailure>(TSuccess value)
            => new Result<TSuccess, TFailure>(value);

        public static implicit operator Result<TSuccess, TFailure>(TFailure value)
            => new Result<TSuccess, TFailure>(value);
    }
}

The extension method which gives me headaches is the Match method, which is defined as follows:

using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ReactiveUiDemo
{
    public static class ObservableExtensions
    {
        public static IObservable<TResult> Match<TSuccess, TFailure, TResult>(
            this IObservable<Result<TSuccess, TFailure>> source,
            Func<TSuccess, Task<TResult>> success,
            Func<TFailure, Task<TResult>> failure)
            => Observable.FromAsync(async () => await source.SelectMany(result => result.Match(success, failure).ToObservable()));
    }
}

The code works as expected until the Match extension method. DisplayError or DisplaySuccess is being called, but that's the end of it; the actions in Do and Subscribe are not executed. I believe there is a problem with my Match extension method, but I have no idea how to solve it.

As a side note, I guess there is a better way to write this bit:

                .Where(u => (u?.Length ?? 0) == 4)
                .Where(u => int.TryParse(u, out _))
                .Select(int.Parse)

I could imagine a TryParseInt extension method, but perhaps it's not needed?

edit

Updated the extension method based on the answer by @GlennWatson, it works a I'd expect now:

public static IObservable<TResult> Match<TSuccess, TFailure, TResult>(
    this IObservable<Result<TSuccess, TFailure>> source,
    Func<TSuccess, Task<TResult>> success,
    Func<TFailure, Task<TResult>> failure)
    => source.SelectMany(r => r.Match(success, failure).ToObservable());

Solution

  • The FromAsync() method is only meant to be used for Task based system.

    When using Observable and subsequent Linq style methods, try to keep it in Observable form as much as possible.

    In your example, you are awaiting on a Observable and wrapper it in a FromAsync. The SelectMany has a overload that understands about Task based operations.