How can I achieve next logic with System.Reactive?
IObservable<int>
that produce new items (ints) on timer when at least one subscriber exist and when new subscriber subscribes, it repeats last X items for new subscriber.
Example:
Let X = 5;
This is code that I use for my observable right now. I use catch and repeat to because Collect()
can throw exception, so subscribe will continue.
IObservable<int> Responses = Observable
.Defer(() => Observable.Interval(_pollInterval, scheduler)
.SelectMany(_ => Collect(monitoringDataProvider).ToObservable())
.Catch<int, Exception>(exception => Observable.Return(-1))
.Repeat());
private int Collect(MonitoringDataProvider monitoringDataProvider)
{
//..some logic that returns int;
}
You are probably searching for the Replay
operator:
public static IConnectableObservable<TSource> Replay<TSource>(
this IObservable<TSource> source,
int bufferSize
)
Returns a connectable observable sequence that shares a single subscription to the underlying sequence, replaying
bufferSize
notifications.
Usage example:
IObservable<int> ReplayedResponses = Responses
.Replay(bufferSize: 5)
.RefCount();
The RefCount
operator is one of the strategies available in order to connect to the underlying observable. Others are the AutoConnect
operator, and the manual Connect
method.