Search code examples
c#system.reactiverx.net

Buffer by time or running sum for reactive extensions


I'm quite new to Reactive Extensions and want to buffer a stream based on time, or by a running sum not exceeding a threshold (size of each item is specified by a lambda) whichever occurs first, much like the existing Buffer by count or time.

Currently I have written my own implementation of a Buffer method that works as expected, using the IScheduler for triggering on timeout, and then managing my own buffers in memory and emitting them whenever the accumulated sum exceeds the threshold, but this feels a bit low level and I thought there must be a more elegant solution to express it using the existing reactive operations in some way and maybe using the TBufferClosing overload of Buffer instead.

The best solution I came up with so far is the following, but it has the drawback on including the last item that caused the threshold causing the sum to be larger than the max sum requested:

    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan) 
    {
        var shared = source.Publish().RefCount();
    
        return shared.Buffer(() => Observable.Amb(
            Observable.Timer(timeSpan)
                .Select(_ => Unit.Default),
            shared.Select(sizeSelector)
                .Scan((a, b) => a + b)
                .SkipWhile(accumulated => accumulated < maxSize)
                .Select(_ => Unit.Default))
            );
    }

Is this possible to make work with existing operators (by tweaking my version above or another way completely), or am I forced to stay with my custom Buffer implementation handling timers and buffer myself?


Solution

  • OK, this should work. Late answers are better than never. I don't think there's a way to do it better than you did using the Buffer operators.

    At core, the problem is a state machine problem, which means you want a Scan solution. The problem with that is, you have two different sources that can alter your state: New item and timeout. Scan doesn't really work with two multiple sources, so we have to somehow combine those two event types into one.

    I did something similar before with Discriminated Unions, and that concept should work here. First the solution (uses Nuget package System.Collections.Immutable):

    public static class X
    {
        public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism
    
            return source
                .Publish(_source => _source
                    .Union(queue.Delay(bufferTimeSpan))
                    .ScanUnion(
                        (list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
                        (state, item) =>
                        { // item handler
                            var itemSize = sizeSelector(item);
                            var newSize = state.size + itemSize;
                            if (newSize > maxSize)
                            {
                                queue.OnNext(Unit.Default);
                                return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
                            }
                            else
                                return (state.list.Add(item), newSize, null);
                        },
                        (state, _) =>
                        { // time out handler
                            queue.OnNext(Unit.Default);
                            return (ImmutableList<TSource>.Empty, 0, state.list);
                        }
                    )
                    .Where(t => t.emitValue != null)
                    .Select(t => t.emitValue.ToList())
                    .TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
            );
        }
    }
    

    Explanation: Union combines two streams of different types into one stream, where the item can be either of type A or type B. ScanUnion works just like Scan, but offers two functions for handling the two different types of items.

    The BehaviorSubject is hit whenever a new Buffer window opens, the Delay operator makes sure the Scan gets it after the defined timespan. The state inside of the Scan holds the list of current bufferred items, and the 'size'. The emitValue is used when a buffer window closes, and to pass on the values.

    Here's the Discriminated Union helper code:

    public static class DUnionExtensions
    {
        public class DUnion<T1, T2>
        {
            public DUnion(T1 t1)
            {
                Type1Item = t1;
                Type2Item = default(T2);
                IsType1 = true;
            }
    
            public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
            {
                Type2Item = t2;
                Type1Item = default(T1);
                IsType1 = false;
            }
    
            public bool IsType1 { get; }
            public bool IsType2 => !IsType1;
    
            public T1 Type1Item { get; }
            public T2 Type2Item { get; }
        }
    
        public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
        {
            return a.Select(x => new DUnion<T1, T2>(x))
                .Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
        }
    
        public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
                TState initialState,
                Func<TState, T1, TState> type1Handler,
                Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
    }