Search code examples
javaobservablerx-javareactive-programming

RxJava - How to test a function with inner "toBlocking" call


I'm trying to add some unit tests to following piece of code:

public List<Stuff> extractStuff(Scheduler scheduler, List<Token> tokens) {
    List<Observable<List<Stuff>>> observables = tokens
            .stream()
            .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
            .collect(Collectors.toList());

    List<Stuff> result = new ArrayList<>();
    for (List<Stuff> stuff: Observable.merge(observables).toBlocking().toIterable()) {
        result.addAll(stuff);
    }

    return result;
}

I want Stuff objects to be fetched with parallelism, but I need to collect all of them before going any further (otherwise the next process does not make any sense).

Code works as expected, but I'm struggling with unit tests:

@Test
public void extractStuff() {
    // GIVEN
    TestScheduler scheduler = new TestScheduler();
    List<Token> tokens = buildTokens();
    ...
    // WHEN
    List<Stuff> result = this.instance.extractStuff(scheduler, tokens);
    // Execution never comes to this point...
    // THEN
    ...
}

Using debugger, I can see that Observable.just(...) looks good (my list of observables is not empty, and I can see my custom mocked object inside).

Problem: the execution seems to be stuck at the Observable.merge(observables).toBlocking() expression. The line result.addAll is never called.

I have tried several things with the TestScheduler object, but I cannot manage to make it work. Most examples I've find across the Internet are dealing with a function that returns Observable object, so the associated unit test can run scheduler.advanceTimeBy(...);. In my situation, I can't apply this approach as Observable object are not returned directly.

Thanks a lot for your help!


Solution

  • I'm not sure that the code, as posted, is behaving the way you expect. Your getStuffByToken(...) method is actually being invoked on the calling thread, not by the Scheduler.

    For simplicity, let me replace Token with Integer and Stuff with String.

    My getStuffByToken(...) is going to return the String representation of Integer and will also include the current Thread's name:

    private List<String> getStuffByToken( Integer token )
    {
        return Arrays.asList( token.toString(), Thread.currentThread().getName() );
    }
    

    I might be on a different version of RxJava, I don't have a toBlocking(...) method but have blockingIterable() - I hope this is equivalent:

    public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
        List<Observable<List<String>>> observables = tokens
                .stream()
                .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
                .collect(Collectors.toList());
    
        List<String> result = new ArrayList<>();
        for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
            result.addAll(stuff);
        }
    
        return result;
    }
    

    If we test the above:

    @Test
    public void testExtractStuff()
    {
        List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
        List<String> result = extractStuff( Schedulers.computation(), tokens );
        System.out.println( result );
    }
    

    We get back:

    [1, main, 3, main, 4, main, 5, main, 2, main]
    

    As you can tell all the getStuffByToken(...) calls were executed on the main Thread.

    Next, the reason your test method isn't getting invoked is because TestScheduler requires the invocation of the TestScheduler.advanceTimeBy(...) to simulate the passing of time that results in processing of your Rx pipeline. Since your method is a blocking method, it's not going to be convenient to test with TestScheduler.

    Armed with both of the pieces of info above, I'm suggesting you do something along these lines:

    public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
    {
        return Observable.fromIterable( tokens )
            .flatMap( token -> Observable.just( token )
                    .subscribeOn( scheduler )
                    .map( this::getStuffByToken )
                    .flatMap( Observable::fromIterable ))
            .toList();
    }
    

    You production code can call extractStuff(...).blockingGet() to resolve the List.

    And, you can test as follows:

    @Test
    public void testExtractStuff()
    {
        TestScheduler scheduler = new TestScheduler();
        List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
        TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();
    
        scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
        test.assertValueCount( 1 );
        test.assertValue( list -> list.size() == 10 );
        test.assertComplete();
    }