Is there a way (an operator) to shrink/enhance the code below ?
I want to take the first element of an Observable but delay its completion until the source has been completed (this is why I concat with ignoreElements
).
public class SOTest {
private final TestScheduler scheduler = new TestScheduler();
@Test
public void take_first_and_do_not_complete() {
TestObserver<Long> test = Observable.interval(1, TimeUnit.SECONDS, scheduler)
.take(7)
.publish(o -> o
.firstElement()
.toObservable()
.concatWith(o
.doOnNext(e -> System.out.println("to be ignored: " + e))
.ignoreElements()
)
)
.doOnNext(e -> System.out.println("First is: " + e))
.test();
scheduler.advanceTimeTo(1, TimeUnit.SECONDS);
test.assertValueCount(1);
test.assertNotComplete();
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
test.assertValueCount(1);
test.assertNotComplete();
scheduler.advanceTimeTo(7, TimeUnit.SECONDS);
test.assertValueCount(1);
test.assertComplete();
}
}
How about .distinctUntilChanged()
Observable.interval(1, TimeUnit.SECONDS, scheduler)
.take(7)
.distinctUntilChanged((a, b) -> true)
.test();
All items except for the first item are blocked by the comparer. The stream completes when the upstream completes.
Edit:
Even shorter with just .distinct
Observable.interval(1, TimeUnit.SECONDS, scheduler)
.take(7)
.distinct(a -> 0)
.test();