Search code examples
c#system.reactiveobserver-patternobservable

Problem with Reactive Extension Observers


I was working on an app using Reactive Extensions and got into the following problem:

say i have two observers P and Q, i want to build a third observer R that if two values of P comes without a Q, R outputs 0. And if after a P comes a Q, R outputs the result of a method passing those values, something like:

P0    Q0    ->    R0 = f(P0,Q0)    
P1          ->    R1 = 0    
P2    Q1    ->    R2 = f(P2,Q1)    
P3          ->    R3 = 0    
P4          ->    R4 = 0    
P5    Q2    ->    R5 = f(P5,Q2)
(...)

and the values come into the obsevers in the following order:

P0 Q0 P1 P2 Q1 P3 P4 P5 Q2

thanks for your help.


Solution

  • Suppose we have two methods

    1. Before, Merges two observable sequences into one observable sequence by using a selector function whenever the first observable produces an element rigth before the second one.
    2. Without, Merges an observable sequence into other observable sequence every time two items came togheter from the first observable without any item from the second one.

    With this methods the problem is almost solved.

    IObservable<TP> P = // observer P
    IObservable<TQ> Q = // observer Q
    
    var PP = P.Without((prev, next) => 0, Q);
    var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function
    
    var ResultSecuence = PP.Merge(PQ);
    

    And here are the two methods

    public static class Observer
    {
        /// <summary>
        /// Merges two observable sequences into one observable sequence by using the selector function 
        /// whenever the first observable produces an element rigth before the second one.
        /// </summary>
        /// <param name="first"> First observable source.</param>
        /// <param name="second">Second observable source.</param>
        /// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param>
        /// <returns>
        /// An observable sequence containing the result of combining elements of both sources 
        /// using the specified result selector function.
        /// </returns>
        public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector)
        {
            var result = new Subject<TResult>();
    
            bool firstCame = false;
            TLeft lastLeft = default(TLeft);
    
            first.Subscribe(item =>
            {
                firstCame = true;
                lastLeft = item;
            });
    
            second.Subscribe(item =>
            {
                if (firstCame)
                    result.OnNext(resultSelector(lastLeft, item));
    
                firstCame = false;
            });
    
            return result;
        }
    
        /// <summary>
        /// Merges an observable sequence into one observable sequence by using the selector function 
        /// every time two items came from <paramref name="first"/> without any item of any observable
        /// in <paramref name="second"/>
        /// </summary>
        /// <param name="first"> Observable source to merge.</param>
        /// <param name="second"> Observable list to ignore.</param>
        /// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param>
        /// <returns>
        /// An observable sequence containing the result of combining elements
        /// using the specified result selector function.
        /// </returns>
        public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first,  Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second)
        {
            var result = new Subject<TResult>();
    
            bool firstCame = false;
            TLeft lastLeft = default(TLeft);
    
            first.Subscribe(item =>
            {
                if (firstCame)
                    result.OnNext(resultSelector(lastLeft, item));
    
                firstCame = true;
                lastLeft = item;
            });
    
            foreach (var observable in second)
                observable.Subscribe(item => firstCame = false);
    
            return result;
        }        
    }