Search code examples
c#system.reactive

RX operator to scan with prior value, with signal deriving from value itself


Say you have a source observable of type IObservable<uint>. I'm looking for a transformation function

IObservable<uint> Countdown(IObservable<uint> source)
{
  // how? Something like ...
  return source.Select(value => ProduceInnerObservable(value)).Switch()
}

that transforms source as follows:

  • When source produces a value, the resulting observable should immediately publish the same value
  • Say the last element published was N. In the absence of any new elements from source, the result observable should publish the value N-1 after N ticks; then N-2 after N-1 ticks; and so on, until 0 is published
  • When source publishes a new value, this timer-like behaviour is reset based on the new value

Example inputs/outputs:

Tick time Source observable Result observable
0 10 10
10 9
19 8
21 2 2
23 1
24 0
100 1 1
101 0
... ... ...

What's the most elegant way to implement this?


Solution

  • Assuming your "ticks" are in tenths of a second - probably this will suit your needs:

    static void Main()
    {
        var subject = new Subject<int>();
    
        Countdown(subject).TimeInterval().Subscribe(x => Console.WriteLine(x));
    
        subject.OnNext(10);
    
        Thread.Sleep(4000);
    
        subject.OnNext(2);
    
        Thread.Sleep(4000);
    
        subject.OnNext(1);
    
        Console.ReadLine();
    
    }
    
    static IObservable<int> Countdown(IObservable<int> source)
    {
        return source.Select(value => Observable.Generate(
            initialState: value-1,
            condition: i => i >= 0,
            iterate: i => i-1,
            resultSelector: i => i,
            timeSelector: _ => TimeSpan.FromMilliseconds(value*100)
            ).StartWith(value)).Switch();
    }