Search code examples
c#f#rx.net

How to combine two different GroupedStreams in Rx.NET?


This question is similar, but it does not apply to my case, since the user needed the merge observable streams from the same IGroupedObservable, while I want to combine streams from different groups.

I have the following structures and streams:

type A = {
  Id: int
  Value: int
}

type B = {
  Id: int
  Value: int
}

//subjects to test input, just any source of As and Bs
let subjectA: Subject<A> = Subject.broadcast
let subjectB: Subject<B> = Subject.broadcast

//grouped streams
let groupedA: IObservable<<IGroupedObservable<int, A>> = Observable.groupBy (fun a -> a.Id) subjectA
let groupedB: IObservable<<IGroupedObservable<int, B>> = Observable.groupBy (fun b -> b.Id) subjectB

I want to somehow merge the internal observables of A and B when groupedA.Key = groupedB.Key, and get an observable of (A, B) pairs where A.Id = B.Id

The signature I want is something like IObservable<IGroupedObservable<int, A>> -> IObservable<IGroupedObservable<int, B>> -> IObservable<IGroupedObservable<int, (A, B)>> where for all (A, B), A.Id = B.Id

I tried a bunch of combineLatest, groupJoin, filters and maps variations, but with no success.

I'm using F# with Rx.Net and FSharp.Control.Reactive, but if you know the answer in C# (or any language, really) please post it


Solution

  • Here is a custom operator GroupJoin that you could use. It is based on the Select, Merge, GroupBy and Where operators:

    /// <summary>
    /// Groups and joins the elements of two observable sequences, based on common keys.
    /// </summary>
    public static IObservable<(TKey Key, IObservable<TLeft> Left, IObservable<TRight> Right)>
        GroupJoin<TLeft, TRight, TKey>(
        this IObservable<TLeft> left,
        IObservable<TRight> right,
        Func<TLeft, TKey> leftKeySelector,
        Func<TRight, TKey> rightKeySelector,
        IEqualityComparer<TKey> keyComparer = null)
    {
        // Arguments validation omitted
        keyComparer ??= EqualityComparer<TKey>.Default;
        return left
            .Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x)))
            .Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x))))
            .GroupBy(e => e.Key, keyComparer)
            .Select(g => (
                g.Key,
                g.Where(e => e.Type == 1).Select(e => e.Item1),
                g.Where(e => e.Type == 2).Select(e => e.Item2)
            ));
    }
    

    Usage example:

    var subjectA = new Subject<A>();
    var subjectB = new Subject<B>();
    
    IObservable<IGroupedObservable<int, (A, B)>> query = subjectA
        .GroupJoin(subjectB, a => a.Id, b => b.Id)
        .SelectMany(g => g.Left.Zip(g.Right, (a, b) => (g.Key, a, b)))
        .GroupBy(e => e.Key, e => (e.a, e.b));