I'm trying to add some unit tests to following piece of code:
public List<Stuff> extractStuff(Scheduler scheduler, List<Token> tokens) {
List<Observable<List<Stuff>>> observables = tokens
.stream()
.map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
.collect(Collectors.toList());
List<Stuff> result = new ArrayList<>();
for (List<Stuff> stuff: Observable.merge(observables).toBlocking().toIterable()) {
result.addAll(stuff);
}
return result;
}
I want Stuff
objects to be fetched with parallelism, but I need to collect all of them before going any further (otherwise the next process does not make any sense).
Code works as expected, but I'm struggling with unit tests:
@Test
public void extractStuff() {
// GIVEN
TestScheduler scheduler = new TestScheduler();
List<Token> tokens = buildTokens();
...
// WHEN
List<Stuff> result = this.instance.extractStuff(scheduler, tokens);
// Execution never comes to this point...
// THEN
...
}
Using debugger, I can see that Observable.just(...)
looks good (my list of observables is not empty, and I can see my custom mocked object inside).
Problem: the execution seems to be stuck at the Observable.merge(observables).toBlocking()
expression. The line result.addAll
is never called.
I have tried several things with the TestScheduler
object, but I cannot manage to make it work. Most examples I've find across the Internet are dealing with a function that returns Observable
object, so the associated unit test can run scheduler.advanceTimeBy(...);
. In my situation, I can't apply this approach as Observable
object are not returned directly.
Thanks a lot for your help!
I'm not sure that the code, as posted, is behaving the way you expect. Your getStuffByToken(...)
method is actually being invoked on the calling thread, not by the Scheduler
.
For simplicity, let me replace Token
with Integer
and Stuff
with String
.
My getStuffByToken(...)
is going to return the String
representation of Integer
and will also include the current Thread
's name:
private List<String> getStuffByToken( Integer token )
{
return Arrays.asList( token.toString(), Thread.currentThread().getName() );
}
I might be on a different version of RxJava, I don't have a toBlocking(...)
method but have blockingIterable()
- I hope this is equivalent:
public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
List<Observable<List<String>>> observables = tokens
.stream()
.map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
.collect(Collectors.toList());
List<String> result = new ArrayList<>();
for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
result.addAll(stuff);
}
return result;
}
If we test the above:
@Test
public void testExtractStuff()
{
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
List<String> result = extractStuff( Schedulers.computation(), tokens );
System.out.println( result );
}
We get back:
[1, main, 3, main, 4, main, 5, main, 2, main]
As you can tell all the getStuffByToken(...)
calls were executed on the main Thread
.
Next, the reason your test method isn't getting invoked is because TestScheduler
requires the invocation of the TestScheduler.advanceTimeBy(...)
to simulate the passing of time that results in processing of your Rx pipeline. Since your method is a blocking method, it's not going to be convenient to test with TestScheduler
.
Armed with both of the pieces of info above, I'm suggesting you do something along these lines:
public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
{
return Observable.fromIterable( tokens )
.flatMap( token -> Observable.just( token )
.subscribeOn( scheduler )
.map( this::getStuffByToken )
.flatMap( Observable::fromIterable ))
.toList();
}
You production code can call extractStuff(...).blockingGet()
to resolve the List
.
And, you can test as follows:
@Test
public void testExtractStuff()
{
TestScheduler scheduler = new TestScheduler();
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();
scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
test.assertValueCount( 1 );
test.assertValue( list -> list.size() == 10 );
test.assertComplete();
}