I was debugging my code which fetches UserWallet
s from database and then generates addresses for them by connecting with external REST API. Now I have a subscribe nested inside another subscribe but I read it's bad solution (It actually doesn't work and I think that's the reason).
userWalletDao.getUnregisteredUserWallets()
.subscribe(nextWallet -> {
log.info("Fetched next wallet for registration {}", nextWallet);
blockchainIntegration.registerUserWallet(nextWallet.getUserId())
.subscribe(address -> {
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
});
});
I was trying to make it in one subscribe, but if I flatMap wallets to addresses I lose a UserWallet
object to set an fetched address for it and persist it back in the database.
How can I fetch wallets and then call an API to generate an address for it with one subscribe?
getUnregisteredUserWallets()
returns Observable<UserWallet>
and registerUserWallet()
returns Single<String>
.
It is highly recommened to read and understand about Dependent sub-flows mentioned in the first comment.
You can solve your problem by changing your observable sequence to something like this
userWalletDao.getUnregisteredUserWallets()
.flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
.flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address)))) // return both wallet from previous mapping and address from current mapping to the next level
.flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
Wallet nextWallet = walletAddressPair.first;
String address = walletAddressPair.second;
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
// here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result
// from it and forward to the next level. Thats why flatMapCompletable is used.
}))
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
// All actions completed
}
@Override
public void onError(Throwable e) {
// any error occurred in the observable chain
}
});