I'm using Reactive Extensions (Rx) in C# and want to filter events in the following way. Imagine I have the following originator sequence:
A B C D E F X G H I X J X X K L M N O X P
And I want to produce the following output:
E F X H I X J X X N O X
Basically, I would buffer (throttle?) events with a max bound (in the example this bound is 2), and when I get certain event (in this case event X), I want to flush that buffer to the output and start buffering again until I see the special event again.
I'm trying a few approaches but without any luck, and I imagine there should be an easy way to accomplish it that I am missing.
EDIT: one constraint, is that I expect to get TONS of events that are discarded, and only a few instances of X, so keeping a buffer in memory with thousands of events to read only the last 2 (or 20) is not really an option.
Here is a stab to answer my own question, please let me know if you see any issues with it.
public static class ObservableHelper
{
/// <summary>
/// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
/// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
/// and starts buffering again.
/// </summary>
/// <typeparam name="T">The type of entry.</typeparam>
/// <param name="stream">The original stream of events.</param>
/// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
/// <param name="bufferSize">The buffer size for accumulated entries.</param>
/// <returns>An observable that has this filtering capability.</returns>
public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
{
if (stream == null) throw new ArgumentNullException("stream");
if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");
return System.Reactive.Linq.Observable.Create<T>(observer =>
{
var buffer = new CircularBuffer<T>(bufferSize);
var subscription = stream.Subscribe(
newItem =>
{
bool result;
try
{
result = shouldFlush(newItem);
}
catch (Exception ex)
{
return;
}
if (result)
{
foreach (var buffered in buffer.TakeAll())
{
observer.OnNext(buffered);
}
observer.OnNext(newItem);
}
else
{
buffer.Add(newItem);
}
},
observer.OnError,
observer.OnCompleted);
return subscription;
});
}
}
By the way, CircularBuffer does not exist out of the box, but the implementation is straightforward.
Then I just call:
data
.FlushOnTrigger(item => item == 'X', bufferSize: 2)
.Subscribe(Console.WriteLine);