Search code examples
c#observablereactive-programmingsystem.reactive

Is there a way to cancel and replace an observable if it is producing too many values too fast?


I have an Observable that produces values at variable speed. In order to not get overwhelmed with values I added a Throttle of three seconds so I only get a value if no further values were published for three seconds. Want I want though is to end the stream if I get a certain amount of updates within a time period and replace it with another observable.

For example, if I get 50 updates in within three seconds, end the stream and replace it with a different stream, similar to how Catch can replace an observable that was terminated by an exception with another one.

Something like the below but no exception is being thrown so can't use Catch:

myObservable
   .Throttle(TimeSpan.FromSeconds(3)) //Not sure if we need to remove Throttle
   .Catch<long, Exception>(e => Observable.Return(long)0)  //Instead of catching an exception, some way to monitor how many updates are coming in before throttling
   .Subscribe

EDIT: I added a marble diagram to try to show what I am looking for.

The initial observable produces values at a variable rate. Values 1-6 come in, none within a burst of 50 in 3 seconds, and these values pass through to throttle, and the final values of 1, 5, and 6 are produced.

Then, the initial observable produces values 7-60, within 3 seconds. Here is where I am trying to do what "???" is showing. The idea was to recognize that 50 or more items were produced within the set timeframe, complete the original obs. and replace it with one I provide, similar to how you can provide an obs. sequence in Catch to replace one that errored (for example if I saw the original sequence produced that huge burst and threw an exception).

After the initial obs. is replaced, the sequence continues with the new one, with the produced items going through the existing throttle.

If only 49 items come within the timespan checked in "???", those values will all pass through to Throttle and only the last one would be produced. If no updates come in at all then nothing happens and no output is produced.

Hopefully what I am asking is a bit more clear now.

enter image description here


Solution

  • Whenever you're trying to replace one observable with another without changing subscriptions you should use the Switch operator to, well, make the switch.

    The pattern looks like this:

    IObservable<T> final =
        original
            .Publish(published =>
                published
                    .TransformInSomeWayThatProducesAValueWhenYouWantToSwitch()
                    .Take(1) //only switch once
                    .Select(xs => replacement)
                    .StartWith(published)
                    .Switch());
    

    Here's a concrete example that, I think, achieves what you're after.

    var rng = new Random();
    IObservable<int> myObservable = Observable.Generate(0, x => true, x => x + 1, x => x, x => TimeSpan.FromSeconds(rng.NextDouble() / 9.75));
    IObservable<int> myReplacement = Observable.Return(-1);
    
    IObservable<int> myFinal =
        myObservable
            .Publish(published =>
                published
                    .Timestamp()
                    .Select(x => x.Timestamp)
                    .Buffer(50, 1)
                    .Where(xs => xs.Count == 50)
                    .Where(xs => xs.Last().Subtract(xs.First()).TotalSeconds < 3.0)
                    .Take(1)
                    .Select(xs => myReplacement)
                    .StartWith(published)
                    .Switch());
    

    A typical run for me will produce values like this:

    1
    2
    3
    ...
    109
    110
    111
    -1