Search code examples
javaandroidkotlinrx-javareactive-programming

How to group data from async sources using RxJava


I'm working on a trading app. When the user select some products I need to show for each product if the market is open and its latest price. The user can select for example 2 products and what I have to do is to show the data only when I have all the info for the 2 products. The data can change at any time (i.e. the market for one of the products got closed). This is my code:

data class ProductInfo(
    val productCode: String,
    val isMarketOpen: Boolean,
    val latestPrice: BigDecimal,
)

// This observable could emit at any time due to user interaction with the UI
private fun productsCodeObservable(): Observable<List<String>> = Observable.just(listOf("ProductA", "ProductB"))

// Markets have different working hours
private fun isMarketOpenObservable(productCode: String): Observable<Boolean> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map {
                // TODO: Use API to determine if the market is open for productCode
                it.toInt() % 2 == 0
            }
}

// The product price fluctuates so this emits every X seconds
private fun latestPriceObservable(productCode: String): Observable<BigDecimal> {
    return Observable.interval(2, TimeUnit.SECONDS)
            .map { productPrice -> productPrice.toBigDecimal() }
}

@Test
fun `test`() {

    val countDownLatch = CountDownLatch(1)

    productsCodeObservable()
            .switchMap { Observable.fromIterable(it) }
            .flatMap { productCode ->
                Observable.combineLatest(
                        isMarketOpenObservable(productCode),
                        latestPriceObservable(productCode)
                ) { isMarketOpen, latestPrice ->
                    ProductInfo(productCode, isMarketOpen, latestPrice)
                }
            }
            .toList()
            .doOnSuccess { productsInfo ->
                println(productsInfo)
            }
            .subscribe()

    countDownLatch.await()
}

I don't know what the problem is because the test method never prints anything. I don't know much about RxJava but my understanding is that toList is not working because the source observables never complete. Any idea about how I can collect the data for the product codes and emit a list when any of the data changes? :)


Solution

  • If you want to receive new product info list every time any of these products has changed:

    productsCodeObservable()
        .switchMap { list ->
            val productInfoObservables = list.map { productCode ->
                Observable.combineLatest(
                    isMarketOpenObservable(productCode),
                    latestPriceObservable(productCode)
                ) { isMarketOpen, latestPrice ->
                    ProductInfo(productCode, isMarketOpen, latestPrice)
                }
            }
            Observable.combineLatest(productInfoObservables) { it.toList() as List<ProductInfo> }
        }
        .doOnNext { productsInfoList ->
            println(productsInfoList)
        }
        .subscribe()