Search code examples
javarx-javareactivex

Subscribing an observable inside another Observable


I was debugging my code which fetches UserWallets from database and then generates addresses for them by connecting with external REST API. Now I have a subscribe nested inside another subscribe but I read it's bad solution (It actually doesn't work and I think that's the reason).

userWalletDao.getUnregisteredUserWallets()
                .subscribe(nextWallet -> {
                    log.info("Fetched next wallet for registration {}", nextWallet);
                    blockchainIntegration.registerUserWallet(nextWallet.getUserId())
                            .subscribe(address -> {
                                nextWallet.setAddress(address);
                                userWalletDao.persistUserWalletAddress(nextWallet);
                                log.info("Registered wallet {} with address {}.", nextWallet, address);
                            });
                });

I was trying to make it in one subscribe, but if I flatMap wallets to addresses I lose a UserWallet object to set an fetched address for it and persist it back in the database.

How can I fetch wallets and then call an API to generate an address for it with one subscribe?

getUnregisteredUserWallets() returns Observable<UserWallet> and registerUserWallet() returns Single<String>.


Solution

  • It is highly recommened to read and understand about Dependent sub-flows mentioned in the first comment.

    You can solve your problem by changing your observable sequence to something like this

           userWalletDao.getUnregisteredUserWallets()
                    .flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
                            .flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address))))  // return both wallet from previous mapping and address from current mapping to the next level
                    .flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
                        Wallet nextWallet = walletAddressPair.first;
                        String address = walletAddressPair.second;
                        nextWallet.setAddress(address);
                        userWalletDao.persistUserWalletAddress(nextWallet);
                        log.info("Registered wallet {} with address {}.", nextWallet, address);
                        // here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result 
                        // from it and forward to the next level.  Thats why flatMapCompletable is used.
                    }))
                    .subscribeWith(new DisposableCompletableObserver() {
                        @Override
                        public void onComplete() {
                           // All actions completed
                        }
    
                        @Override
                        public void onError(Throwable e) {
                          // any error occurred in the observable chain
                        }
                    });