Search code examples
rx-javareactivex

ReactiveX emit null or sentinel value after timeout


Looking for a clean way to transform a source Observable to emit a single null (or sentinel value) after not emitting an item for some duration.

For example, if the source observable emits 1, 2, 3 then stops emitting for 10 seconds before emitting 4, 5, 6 I would like the emitted items to be 1, 2, 3, null, 4, 5, 6.

The use case is for displaying values in a UI where the displayed value should turn into a dash - or N/A if the last emitted value is stale/old.

I looked into the timeout operator but it terminates the Observable when the timeout occurs which is undesirable.

Using RxJava.


Solution

  • Based on akarnokd's answer and an answer in a similar question, an alternative implementation:

    A single sentinel value (as per the OP)

    If you're looking for a single value to indicate the lapse of time between emissions:

    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<Integer> subject = TestSubject.create(scheduler);
    final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
    
    final long duration = 100;
    final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
        .concatWith(Observable.never())
        .takeUntil(subject)
        .repeat();
    
    subject.mergeWith(timeout).subscribe(subscriber);
    
    subject.onNext(1,   0);
    subject.onNext(2, 100);
    subject.onNext(3, 200);
    
    scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
    scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
    
    subject.onNext(4,   0);
    subject.onNext(5, 100);
    subject.onNext(6, 200);
    
    scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
    
    subscriber.assertNoTerminalEvent();
    subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
    

    Continuous sentinel values

    If you're looking to receive values continuously after the source observable not emitting for some duration:

    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<Integer> subject = TestSubject.create(scheduler);
    final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
    
    final long duration = 100;
    final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
        .map(x -> -1)
        .takeUntil(subject)
        .repeat();
    
    subject.mergeWith(timeout).subscribe(subscriber);
    
    subject.onNext(1,   0);
    subject.onNext(2, 100);
    subject.onNext(3, 200);
    
    scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
    scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
    
    subject.onNext(4,   0);
    subject.onNext(5, 100);
    subject.onNext(6, 200);
    
    scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
    
    subscriber.assertNoTerminalEvent();
    subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
    

    The difference being the timeout observable and whether it's recurring or not.

    You can replace -1 with null as needed.

    All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72.