Search code examples
javarx-androidrx-java2

RxJava 2 - Call Completable after another Completable


I'm a newbie in RxJava and faced an issue as follows:

I have two Completable objects to store some data. I'd like to trigger the first one and later on start the second one only after the first one has finished with success. The call to the second Completable should be blocked until the first one has finished with success. Also, if the first one has finished with error, the other one should be also skipped.

Looking through documentation and other SO questions it seems that concatWith or andThen should work for me. But in both manual test and unit test I can see that the second completable is triggered in parallel to the first one :/

First completable

public Completable doA() {
  Log.d(TAG, "class call");

  return db.countRows()
    .doOnSuccess(integer -> {
      Log.d(TAG, "found rows: "+integer);
    })
    .doOnError(Throwable::printStackTrace)
    .flatMapCompletable(this::customAction);
}

private Completable customAction(final int count) {
  Log.d(TAG, "count: "+count);
  if (count > 0) {
    Log.d(TAG, "no rows, skip");
    return Completable.complete();
  }

  final User user = ...
  return db.save(user); // return Completable
}

Second Completable

public Completable doB() {
  Log.d(TAG, "call to B");
  // ...
}

Attempt to invoke B after A

public Completable someMethod() {
    Log.d(TAG, "someMethod");
    return doA()
        .andThen(doB());
        // this also doesn't work
        //.concatWith(doB());
}

The subscription

someMethod()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnComplete(() -> {
      Log.d(TAG, "complete");
      // ...
    })
    .doOnError(throwable -> {
      Log.d("Main", "error "+throwable.getMessage());
      // ...
    })
    .subscribe();

When I run my app and check logs I can see:

D/Some method: some method
D/DoA: class call
D/DoB: class call   // <- why here?
D/DoA: found rows: 0
D/DoA: count: 0

Also the following unit test fails:

@Test
public void test() {
  when(doa.doA()).thenReturn(Completable.error(new Exception("test")));

  observe(); // subscription with TestObserver

  verify(dob, never()).doB(); // fails with NeverWantedButInvoked
}

What am I missing?


Solution

  • Because you called doB(). Let me rewrite your flow:

    public Completable someMethod() {
        Log.d(TAG, "someMethod");
    
        // doA() inlined
        LOG.d("class call");
        Completable a = ...
    
        // doB() inlined
        Log.d("class call");
        Completable b = ...
    
        return a.andThen(b);
    }