Search code examples
reactive-programmingsystem.reactiverx.net

Rx.NET Take an element and subscribe again after some time


I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:

Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...

How can achieve this simple thing with Rx.NET?


Solution

  • Try this:

    Input
        .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
        .SelectMany(xs => xs.Take(1));
    

    Here's a test:

    var query =
        Observable
            .Interval(TimeSpan.FromSeconds(0.2))
            .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
            .SelectMany(xs => xs.Take(1));
    

    This produced:

    0 
    5 
    10 
    14 
    19 
    24 
    29 
    34 
    39 
    

    The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.