I was working on an app using Reactive Extensions and got into the following problem:
say i have two observers P and Q, i want to build a third observer R that if two values of P comes without a Q, R outputs 0. And if after a P comes a Q, R outputs the result of a method passing those values, something like:
P0 Q0 -> R0 = f(P0,Q0)
P1 -> R1 = 0
P2 Q1 -> R2 = f(P2,Q1)
P3 -> R3 = 0
P4 -> R4 = 0
P5 Q2 -> R5 = f(P5,Q2)
(...)
and the values come into the obsevers in the following order:
P0 Q0 P1 P2 Q1 P3 P4 P5 Q2
thanks for your help.
Suppose we have two methods
With this methods the problem is almost solved.
IObservable<TP> P = // observer P
IObservable<TQ> Q = // observer Q
var PP = P.Without((prev, next) => 0, Q);
var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function
var ResultSecuence = PP.Merge(PQ);
And here are the two methods
public static class Observer
{
/// <summary>
/// Merges two observable sequences into one observable sequence by using the selector function
/// whenever the first observable produces an element rigth before the second one.
/// </summary>
/// <param name="first"> First observable source.</param>
/// <param name="second">Second observable source.</param>
/// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param>
/// <returns>
/// An observable sequence containing the result of combining elements of both sources
/// using the specified result selector function.
/// </returns>
public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector)
{
var result = new Subject<TResult>();
bool firstCame = false;
TLeft lastLeft = default(TLeft);
first.Subscribe(item =>
{
firstCame = true;
lastLeft = item;
});
second.Subscribe(item =>
{
if (firstCame)
result.OnNext(resultSelector(lastLeft, item));
firstCame = false;
});
return result;
}
/// <summary>
/// Merges an observable sequence into one observable sequence by using the selector function
/// every time two items came from <paramref name="first"/> without any item of any observable
/// in <paramref name="second"/>
/// </summary>
/// <param name="first"> Observable source to merge.</param>
/// <param name="second"> Observable list to ignore.</param>
/// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param>
/// <returns>
/// An observable sequence containing the result of combining elements
/// using the specified result selector function.
/// </returns>
public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first, Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second)
{
var result = new Subject<TResult>();
bool firstCame = false;
TLeft lastLeft = default(TLeft);
first.Subscribe(item =>
{
if (firstCame)
result.OnNext(resultSelector(lastLeft, item));
firstCame = true;
lastLeft = item;
});
foreach (var observable in second)
observable.Subscribe(item => firstCame = false);
return result;
}
}