Search code examples
.netsystem.reactive

Reactive Extensions non-overlapping, serial GroupBy (or WindowUntilChange)


I have tried to make a SerialGroupBy operator for Rx.Net. The point of the operator is to work like GroupBy, but each time a new group is created the former is completed. So there’s never more than one group open at a time.

My current "best" implementation is something like this:

public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
    this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
    stream.Publish(shared => 
        shared.GroupByUntil(keySelector, elementSelector, group => 
            shared.DistinctUntilChanged(keySelector)));

And I hoped to get the group closing before the next group starts like tested here:

[Fact]
public void SerialGroupBy()
{
    var scheduler = new TestScheduler();

    var stream = scheduler.CreateHotObservable(
        OnNext(201, "First group"),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));

    var observer = scheduler.CreateObserver<string>();

    stream.SerialGroupBy(x => x.Length, x => x)
        .Select(x => x.Subscribe(observer))
        .Subscribe();

    scheduler.Start();

    observer.Messages.ShouldBeLike(
        OnNext(201, "First group"),
        OnCompleted<string>(202),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));
}

But the completion of the first groups comes too late, like:

OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));

I can understand why (the opening-observer is notified before the closing-observer as per the implementation of GroupByUntil), but how can I implement it so the groups are non-overlapping?

I have tried several different ways, but it always ends up with a variant of the same problem.


Solution

  • This was what I came up with in the end:

    public static class SerialGroupByOperator
    {
        public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
            this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
            Observable.Create<IGroupedObservable<TKey, TElement>>(observer => stream
                .Scan<TSource, GroupedObservable<TKey, TElement>>(null, (current, next) =>
                {
                    var key = keySelector(next);
    
                    if (current == null)
                    {
                        var nextGroup = new GroupedObservable<TKey, TElement>(key);
    
                        observer.OnNext(nextGroup);
                        nextGroup.Group.OnNext(elementSelector(next));
    
                        return nextGroup;
                    }
    
                    if (!Equals(key, current.Key))
                    {
                        current.Group.OnCompleted();
    
                        var nextGroup = new GroupedObservable<TKey, TElement>(key);
    
                        observer.OnNext(nextGroup);
                        nextGroup.Group.OnNext(elementSelector(next));
    
                        return nextGroup;
                    }
    
                    current.Group.OnNext(elementSelector(next));
    
                    return current;
                })
                .LastOrDefaultAsync()
                .Where(x => x != null)
                .Subscribe(x => x.Group.OnCompleted(), observer.OnError, observer.OnCompleted));
    }
    
    internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
    {
        public GroupedObservable(TKey key)
        {
            Key = key;
            Group = new Subject<TElement>();
        }
    
        public TKey Key { get; }
        public ISubject<TElement> Group { get; }
    
        public IDisposable Subscribe(IObserver<TElement> observer) => Group.Subscribe(observer);
    }