Search code examples
javarx-java2

Take the first item and do not complete


Is there a way (an operator) to shrink/enhance the code below ?

I want to take the first element of an Observable but delay its completion until the source has been completed (this is why I concat with ignoreElements).

public class SOTest {

    private final TestScheduler scheduler = new TestScheduler();

    @Test
    public void take_first_and_do_not_complete() {
        TestObserver<Long> test = Observable.interval(1, TimeUnit.SECONDS, scheduler)
                .take(7)
                .publish(o -> o
                        .firstElement()
                        .toObservable()
                        .concatWith(o
                                .doOnNext(e -> System.out.println("to be ignored: " + e))
                                .ignoreElements()
                        )
                )
                .doOnNext(e -> System.out.println("First is: " + e))
                .test();

        scheduler.advanceTimeTo(1, TimeUnit.SECONDS);

        test.assertValueCount(1);
        test.assertNotComplete();

        scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
        test.assertValueCount(1);
        test.assertNotComplete();

        scheduler.advanceTimeTo(7, TimeUnit.SECONDS);
        test.assertValueCount(1);
        test.assertComplete();
    }
}

Solution

  • How about .distinctUntilChanged()

    Observable.interval(1, TimeUnit.SECONDS, scheduler)
        .take(7)
        .distinctUntilChanged((a, b) -> true)
        .test();
    
    

    All items except for the first item are blocked by the comparer. The stream completes when the upstream completes.


    Edit:

    Even shorter with just .distinct

    Observable.interval(1, TimeUnit.SECONDS, scheduler)
        .take(7)
        .distinct(a -> 0)
        .test();