Search code examples
javarx-java

RxJava; How to emit observables synchronously


I want to synchronously emit two Observable objects (which are asynchronous), one after the other where it returns the first emitted Observable object. If the first one fails, it should not emit the second one.

Let's say we have one Observable that signs a user in, and another Observable that automatically selects the user's account, after signing in.

This is what I tried:

public Observable<AccessToken> signInAndSelectAccount(String username, String password)
{

    Observable<AccessToken> ob1 = ...; // Sign in.
    Observable<Account> ob2 = ...; // Select account.


    return Observable.zip(
            ob1,
            ob2,
            new Func2<AccessToken, Account, AccessToken>() {
                @Override
                public AccessToken call(AccessToken accessToken, Account account)
                {
                     return accessToken;
                }
            });
}

This unfortunately does not work for my use case. It will emit/call both observables parallel, starting with 'ob1'.

Did someone encounter a similar use case? Or has an idea on how to make observables wait for eachother in a synchronous way, where the first emitted can be returned?

Thanks in advance.


Solution

  • There is no such term as "wait" in reactive programming. You need to think about creating of a data stream, where one Observable could be triggered by another. In your case after receiving token you need to receive account. It could look like this:

    Observable<Account> accountObservable = Observable.create(new Observable.OnSubscribe<AccessToken>() {
        @Override public void call(Subscriber<? super AccessToken> subscriber) {
            subscriber.onNext(new AccessToken());
            subscriber.onCompleted();
        }
    }).flatMap(accessToken -> Observable.create(new Observable.OnSubscribe<Account>() {
        @Override public void call(Subscriber<? super Account> subscriber) {
            subscriber.onNext(new Account(accessToken));
            subscriber.onCompleted();
        }
    }));