Search code examples
c#.netsystem.reactive

Rx.NET - Set capacity and drop oldest


System.Threading.Channels allows us to specify a capacity and full mode = DropOldest. Basically when the channel is full and a message is being processed for 10 seconds, during these 10 seconds it will drop the new records.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.DropOldest
});

Is there a way to do that with Rx?


Solution

  • The Rx observables don't have the property of "being full". An observable sequence is not a storage of messages, like a Queue<T> or a Channel<T> is. It's just a generator/propagator of messages. Some Rx operators have internal queues in order to perform their work, like the Concat and the Zip operators for example. Generally these queues are hidden, and cannot be configured to be "lossy".

    An Rx component that might have the functionality that you are looking for is the ReplaySubject<T>. This component can be configured with the maximum number of messages that it can replay (int bufferSize), and with the maximum duration that it can store each message before discarding it (TimeSpan window). If you set the bufferSize but not the window, the ReplaySubject<T> will eventually buffer the specified number of items, and then the buffer will retain the same size forever. Each incoming message will cause the oldest buffered message to be dropped. A ReplaySubject<T> is not a consumable queue like the Channel<T>. It is always ready to propagate all the messages in its buffer, to any new subscribers that might come by in the future.

    The ReplaySubject<T> is used as propagator by the Replay operator, similarly to how the Publish operator is backed internally by a Subject<T>.