I have tried to make a SerialGroupBy operator for Rx.Net. The point of the operator is to work like GroupBy, but each time a new group is created the former is completed. So there’s never more than one group open at a time.
My current "best" implementation is something like this:
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
stream.Publish(shared =>
shared.GroupByUntil(keySelector, elementSelector, group =>
shared.DistinctUntilChanged(keySelector)));
And I hoped to get the group closing before the next group starts like tested here:
[Fact]
public void SerialGroupBy()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateHotObservable(
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
var observer = scheduler.CreateObserver<string>();
stream.SerialGroupBy(x => x.Length, x => x)
.Select(x => x.Subscribe(observer))
.Subscribe();
scheduler.Start();
observer.Messages.ShouldBeLike(
OnNext(201, "First group"),
OnCompleted<string>(202),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
}
But the completion of the first groups comes too late, like:
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));
I can understand why (the opening-observer is notified before the closing-observer as per the implementation of GroupByUntil), but how can I implement it so the groups are non-overlapping?
I have tried several different ways, but it always ends up with a variant of the same problem.
This was what I came up with in the end:
public static class SerialGroupByOperator
{
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
Observable.Create<IGroupedObservable<TKey, TElement>>(observer => stream
.Scan<TSource, GroupedObservable<TKey, TElement>>(null, (current, next) =>
{
var key = keySelector(next);
if (current == null)
{
var nextGroup = new GroupedObservable<TKey, TElement>(key);
observer.OnNext(nextGroup);
nextGroup.Group.OnNext(elementSelector(next));
return nextGroup;
}
if (!Equals(key, current.Key))
{
current.Group.OnCompleted();
var nextGroup = new GroupedObservable<TKey, TElement>(key);
observer.OnNext(nextGroup);
nextGroup.Group.OnNext(elementSelector(next));
return nextGroup;
}
current.Group.OnNext(elementSelector(next));
return current;
})
.LastOrDefaultAsync()
.Where(x => x != null)
.Subscribe(x => x.Group.OnCompleted(), observer.OnError, observer.OnCompleted));
}
internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
{
public GroupedObservable(TKey key)
{
Key = key;
Group = new Subject<TElement>();
}
public TKey Key { get; }
public ISubject<TElement> Group { get; }
public IDisposable Subscribe(IObserver<TElement> observer) => Group.Subscribe(observer);
}