Search code examples
androidrx-javareactive-programmingrx-androidrx-kotlin

Composing two observable sources using one as a predicate in RxJava


I have 2 observable sources where fetchProductList() returns Observable<Datasource> and other canInvite.execute() takes a value and returns a Single<Boolean>. I need to compose the stream to feed every emission from first source to the second in a way that I can return all the emission from fetchProductList() with the true value as a list. I tried to compose it this way but it looks a bit clunky, Any better/simpler ways to do it?

fetchProductList()
                    .map { dataSource -> dataSource.data }
                    .flatMap{ data ->
                        Observable.from(data.roles.map { role ->
                            canInvite.execute(role.key).map { canInvite ->
                                role to canInvite
                            }
                        })
                    }

                    .compose { it -> Single.merge(it) }
                    .filter { it -> it.second == true}
                    .map { it -> it.first }
                    .toList()
                    .subscribe {

Solution

  • From my point of view, this is a bit more clear solution:

    fetchProductList()
        .map { dataSource -> dataSource.data }
        .flatMap { data ->
            Observable.fromIterable(data.roles)
                .flatMapMaybe { role ->
                    canInvite.execute(role.key)
                        .flatMapMaybe { result ->
                            if (result) Maybe.just(role) else Maybe.empty()
                        }
                }
        }
        .toList()
        .subscribe { result -> println(result) }