Looking for a clean way to transform a source Observable
to emit a single null
(or sentinel value) after not emitting an item for some duration.
For example, if the source observable emits 1, 2, 3
then stops emitting for 10 seconds before emitting 4, 5, 6
I would like the emitted items to be 1, 2, 3, null, 4, 5, 6
.
The use case is for displaying values in a UI where the displayed value should turn into a dash -
or N/A
if the last emitted value is stale/old.
I looked into the timeout
operator but it terminates the Observable
when the timeout occurs which is undesirable.
Using RxJava.
Based on akarnokd's answer and an answer in a similar question, an alternative implementation:
If you're looking for a single value to indicate the lapse of time between emissions:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
If you're looking to receive values continuously after the source observable not emitting for some duration:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
The difference being the timeout
observable and whether it's recurring or not.
You can replace -1
with null
as needed.
All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72
.