Search code examples
c#system.reactiverx.net

Make System.Reactive repeat last X items on new subscribe


How can I achieve next logic with System.Reactive?

IObservable<int> that produce new items (ints) on timer when at least one subscriber exist and when new subscriber subscribes, it repeats last X items for new subscriber. Example: Let X = 5;

  1. SubscriberA subscribes and Observable emits 1, 2, 3, 4, 5, 6, 7
  2. SubscriberB subscribes and gets 3, 4, 5, 6, 7 (5 last items), than both of them continue to receive new items like 8, 9, 10, 11 etc.
  3. If SuscriberC subscribes, than it will get 7, 8, 9, 10, 11 and so on.

This is code that I use for my observable right now. I use catch and repeat to because Collect() can throw exception, so subscribe will continue.

IObservable<int> Responses = Observable
    .Defer(() => Observable.Interval(_pollInterval, scheduler)
    .SelectMany(_ => Collect(monitoringDataProvider).ToObservable())
    .Catch<int, Exception>(exception => Observable.Return(-1))
    .Repeat());

private int Collect(MonitoringDataProvider monitoringDataProvider)
{
    //..some logic that returns int;
}

Solution

  • You are probably searching for the Replay operator:

    public static IConnectableObservable<TSource> Replay<TSource>(
        this IObservable<TSource> source,
        int bufferSize
    )
    

    Returns a connectable observable sequence that shares a single subscription to the underlying sequence, replaying bufferSize notifications.

    Usage example:

    IObservable<int> ReplayedResponses = Responses
        .Replay(bufferSize: 5)
        .RefCount();
    

    The RefCount operator is one of the strategies available in order to connect to the underlying observable. Others are the AutoConnect operator, and the manual Connect method.