Search code examples
c#eventssystem.reactivecircular-buffer

Rx: How to buffer events (ring buffer) and only flush them when a special event occurs?


I'm using Reactive Extensions (Rx) in C# and want to filter events in the following way. Imagine I have the following originator sequence:

A B C D E F X G H I X J X X K L M N O X P

And I want to produce the following output:

E F X H I X J X X N O X

Basically, I would buffer (throttle?) events with a max bound (in the example this bound is 2), and when I get certain event (in this case event X), I want to flush that buffer to the output and start buffering again until I see the special event again.

I'm trying a few approaches but without any luck, and I imagine there should be an easy way to accomplish it that I am missing.

EDIT: one constraint, is that I expect to get TONS of events that are discarded, and only a few instances of X, so keeping a buffer in memory with thousands of events to read only the last 2 (or 20) is not really an option.


Solution

  • Here is a stab to answer my own question, please let me know if you see any issues with it.

    public static class ObservableHelper
    {
        /// <summary>
        /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
        /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
        /// and starts buffering again.
        /// </summary>
        /// <typeparam name="T">The type of entry.</typeparam>
        /// <param name="stream">The original stream of events.</param>
        /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
        /// <param name="bufferSize">The buffer size for accumulated entries.</param>
        /// <returns>An observable that has this filtering capability.</returns>
        public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
        {
            if (stream == null) throw new ArgumentNullException("stream");
            if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
            if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");
    
            return System.Reactive.Linq.Observable.Create<T>(observer =>
            {
                var buffer = new CircularBuffer<T>(bufferSize);
                var subscription = stream.Subscribe(
                    newItem =>
                        {
                            bool result;
                            try
                            {
                                result = shouldFlush(newItem);
                            }
                            catch (Exception ex)
                            {
                                return;
                            }
    
                            if (result)
                            {
                                foreach (var buffered in buffer.TakeAll())
                                {
                                    observer.OnNext(buffered);
                                }
    
                                observer.OnNext(newItem);
                            }
                            else
                            {
                                buffer.Add(newItem);
                            }
                        },
                    observer.OnError,
                    observer.OnCompleted);
    
                return subscription;
            });
        }
    }
    

    By the way, CircularBuffer does not exist out of the box, but the implementation is straightforward.

    Then I just call:

            data
                .FlushOnTrigger(item => item == 'X', bufferSize: 2)
                .Subscribe(Console.WriteLine);