Search code examples
c#f#system.reactivereactive-programmingreactivex

Cycling dependencies between streams in reactive programming


Dabbling in reactive programming, I often encounter situations where two streams depend on each other. What is an idiomatic way to solve these cases?

A minimal example: There are buttons A and B, both display a value. Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.

First solution I could come up with (example in F#, but answers in any language are welcome):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1

This solution uses mutable state and subjects, it not very readable and does not look idiomatic.

The second solution I tried involves creating a method which links two dependent streams together:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)

This seems a bit better, but since there exists no method like dependency in the reactive library, I believe there exist a more idiomatic solution. It is also easy to introduce infinite recursion by using the second approach.

What is the recommended way to approach problems involving cycling dependency between streams, such as in the example above, in reactive programming?


Solution

  • EDIT:

    Here's an F# solution:

    type DU = 
        | A 
        | B 
    
    type State = { AValue : int; BValue : int }
    
    let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 
    
        let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))
    
        let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
            | A -> { state with AValue = state.AValue + state.BValue }
            | B -> { state with BValue = state.AValue }
        )
    
        result
    

    F# is actually a great language for this, thanks to the built-in discriminated unions and records. Here's an answer written in C#, with a custom Discriminated Union; my F# is rather rusty.

    The trick is to turn your two observables into one observable using discriminated union. So basically uniting a and b into one observable of a discriminated union:

    a : *---*---*---**
    b : -*-*--*---*---
    du: ab-ba-b-a-b-aa
    

    Once that is done, so you can react to if the item is an 'A' push or a 'B' push.

    Just to confirm, I assume there's no way to explicitly set the value embedded in ButtonA/ButtonB. If there is, those changes should be modeled as observables, and also worked into the discriminated union.

    var a = new Subject<Unit>();
    var b = new Subject<Unit>();
    var observable = a.DiscriminatedUnion(b)
        .Scan(new State(0, 1), (state, du) => du.Unify(
            /* A clicked case */_ => new State(state.A + state.B, state.B), 
            /* B clicked case */_ => new State(state.A, state.A)
        )
    );
    
    observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    b.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    a.OnNext(Unit.Default);
    b.OnNext(Unit.Default);
    

    Here's the classes this relies on in C#. Most of this translates easily to built-in F# types.

    public class State /*easily replaced with an F# record */
    {
        public State(int a, int b)
        {
            A = a;
            B = b;
        }
    
        public int A { get; }
        public int B { get; }
    }
    
    /* easily replaced with built-in discriminated unions and pattern matching */
    public static class DiscriminatedUnionExtensions
    {
        public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
        {
            return Observable.Merge(
                a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
                b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
            );
        }
    
        public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
            Func<T1, TResult> f1, Func<T2, TResult> f2)
        {
            return source.Select(union => Unify(union, f1, f2));
        }
    
        public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
        {
            return union.Item == 1
                ? f1(union.Item1)
                : f2(union.Item2)
            ;
        }
    }
    
    public class DiscriminatedUnionClass<T1, T2>
    {
        private readonly T1 _t1;
        private readonly T2 _t2;
        private readonly int _item;
        private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
        {
            _t1 = t1;
            _t2 = t2;
            _item = item;
        }
    
        public int Item
        {
            get { return _item; }
        }
    
        public T1 Item1
        {
            get { return _t1; }
        }
    
        public T2 Item2
        {
            get { return _t2; }
        }
    
        public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
        {
            return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
        }
    
        public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
        {
            return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
        }
    }