Search code examples
androidandroid-studiorx-javareactive-programmingrx-java2

RxJava Flowable.zip never returns a value


I have a code in my repository which has to call two endpoints. I have used Flowable.zip() but it doesn't seem to return a value. The Call doesn't fail even if there is no network available.

    fun fetchRateRemote(): Flowable<ResultWrapper<List<RateModel>>> {
    return Flowable.zip<Flowable<CurrenciesDTO>, Flowable<RateDTO>, ResultWrapper<List<RateModel>>>(
            {
        apiEndpoints.fetchCurrencies(key)
    }, {
        apiEndpoints.fetchRate(key)
    }, { t1, t2 ->
        val rateList = mutableListOf<RateModel>()
        t2.subscribe { rate->
            for((k,v) in rate.quotes ){
                val currency = k.removeRange(0,3)
                t1.subscribe {cur->
                    val currencyName = cur.currencies[currency]
                    if (currencyName != null) {
                        rateList.add(RateModel("$currencyName ($currency)", v.toString()))
                    }
                }
            }
        }
        ResultWrapper.Success(rateList)
    }).subscribeOn(Schedulers.io())
}

I use a wrapper to mimic state and this is what I do in my viewmodel.

private fun fetchRates(){
    disposable.add(repository.fetchRateRemote()
            .startWith(ResultWrapper.Loading)
            .onErrorReturn {
                ResultWrapper.Error(it)
            }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(object : DisposableSubscriber<ResultWrapper<List<RateModel>>>() {
                override fun onComplete() {}

                override fun onNext(rate: ResultWrapper<List<RateModel>>) {
                    rates.postValue(rate)
                }

                override fun onError(error: Throwable) {
                    error.printStackTrace()
                }
            })
    )
}

I then observe rate in my activity via LiveData. The wrapper or the observation isn't the issue. It works with other calls, I do not know why the zip call doesn't work. I'm fairly new to RxJava so If I didn't implement something correctly in my repository please help correct me.


Solution

  • Okay! I made a lot of mistakes with the code in the repository above but I managed to fix it. Here's the solution. The Type arguments for the zip method was wrong! I didn't call the BiFunction argument properly too.

        fun fetchRateRemote(): Flowable<ResultWrapper<List<RateModel>>> {
        return Flowable.zip<CurrenciesDTO, RateDTO, ResultWrapper<List<RateModel>>>(
                apiEndpoints.fetchCurrencies(key), apiEndpoints.fetchRate(key), BiFunction { t1, t2 ->
            val rateList = mutableListOf<RateModel>()
            for((k,v) in t2.quotes ){
                val currencyCode = k.removeRange(0,3)
                val currencyName = t1.currencies[currencyCode]
                if (currencyName != null) {
                    rateList.add(RateModel("$currencyName ($currencyCode)", v.toString()))
                }
            }
            ResultWrapper.Success(rateList)
        }).subscribeOn(Schedulers.io())
    }