Search code examples
c#system.reactivefluentreactiveui

How to encapsulate the creation of long reactive chains of observables


Currently I have the following block of Rx/ReactiveUI code:

        this.WhenAnyValue(x => x.Listras)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(im => GetArray.FromChannels(im, 0, 1))
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.Grayscale, out _grayscale);

        this.WhenAnyValue(x => x.Grayscale)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(ar => Gaussian.GaussianConvolution(ar, 1.5))
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.BlurMenor, out _blurMenor);

        this.WhenAnyValue(x => x.BlurMenor)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.ImagemBlurMenor, out _imagemBlurMenor);

        this.WhenAnyValue(x => x.BlurMenor)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(ar => Gaussian.VerticalGaussianConvolution(ar, 5))
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.BlurMaior, out _blurMaior);

        this.WhenAnyValue(x => x.BlurMaior)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.ImagemBlurMaior, out _imagemBlurMaior);

        this.WhenAnyValue(x => x.BlurMenor, x => x.BlurMaior)
            .Where(tuple => tuple.Item1 != null && tuple.Item2 != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(tuple => ArrayOperations.Diferença(tuple.Item1, tuple.Item2))
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.Diferença, out _diferença);

        this.WhenAnyValue(x => x.Diferença)
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(millis))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
            .ObserveOn(RxApp.MainThreadScheduler)
            .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença);

As you can see, it flagrantly violates the DRY principle, but I dont know how I could parameterize away the passing of properties and delegates.

What is the usual way of automating the creation of these method chains in Rx/ReactiveUI?


Solution

  • The beauty of Rx is that you can create your own operators based on other operators. This is because of the functional side of Rx.

    You can create a new operator which encapsulates all the common behavior and takes the small differences as parameters:

    // Put this class somewhere useful. Either beside your VM inside the same namespace
    // Or in a seperate file for all your custom operators
    public static class ObservableMixins
    {
        public static IObservable<TOut> ThrottledSelect<TIn, TOut>(this IObservable<TIn> source, int milliseconds, Func<TIn, TOut> selector) =>
            source
                .Where(item => item != null)
                .Throttle(TimeSpan.FromMilliseconds(milliseconds))
                .ObserveOn(TaskPoolScheduler.Default)
                .Select(selector)
                .ObserveOn(RxApp.MainThreadScheduler)
    }
    

    The use it like this:

    this.WhenAnyValue(x => x.Listras)
        .ThrottledSelect(millis, im => GetArray.FromChannels(im, 0, 1))
        .ToProperty(this, x => x.Grayscale, out _grayscale);
    
    this.WhenAnyValue(x => x.Grayscale)
        .ThrottledSelect(millis, ar => Gaussian.GaussianConvolution(ar, 1.5))
        .ToProperty(this, x => x.BlurMenor, out _blurMenor);
    
    this.WhenAnyValue(x => x.BlurMenor)
        .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
        .ToProperty(this, x => x.ImagemBlurMenor, out _imagemBlurMenor);
    
    this.WhenAnyValue(x => x.BlurMenor)
        .ThrottledSelect(millis, ar => Gaussian.VerticalGaussianConvolution(ar, 5))
        .ToProperty(this, x => x.BlurMaior, out _blurMaior);
    
    this.WhenAnyValue(x => x.BlurMaior)
        .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
        .ToProperty(this, x => x.ImagemBlurMaior, out _imagemBlurMaior);
    
    this.WhenAnyValue(x => x.BlurMenor, x => x.BlurMaior)
        // Notice how I'm returning a null if either item is null
        // It will be filtered in the operator
        .Select(tuple => tuple.Item1 != null || tuple.Item2 != null ? null : tuple)
        .ThrottledSelect(millis, tuple => ArrayOperations.Diferença(tuple.Item1, tuple.Item2))
        .ToProperty(this, x => x.Diferença, out _diferença);
    
    this.WhenAnyValue(x => x.Diferença)
        .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
        .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença);
    

    If you're feeling a bit less adventurous, you can of course use it as a regular method which takes an observable:

    public IObservable<T> ThrottledSelect<TIn, TOut>(IObservable<TIn> source, int milliseconds, Func<TIn, TOut> selector) =>
        source
            .Where(item => item != null)
            .Throttle(TimeSpan.FromMilliseconds(milliseconds))
            .ObserveOn(TaskPoolScheduler.Default)
            .Select(selector)
            .ObserveOn(RxApp.MainThreadScheduler)
    

    And use it like this:

    ThrottledSelect(this.WhenAnyValue(x => x.Diferença), millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; })
        .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença);