Search code examples
androidrx-javarx-java2rx-kotlinrx-kotlin2

Converting loop with condition into RxJava stream


I have code that does blocking operation in while loop (downloads some data from a server). Client does not know how many items are going to be returned in each step. Loop breaks when N items are downloaded.

val n = 10
val list = ArrayList<T>()

while (list.size < n) {
    val lastItemId = list.last()?.id ?: 0
    val items = downloadItems(lastItemId)
    list.addAll(items)
}

downloadItems performs blocking HTTP call and returns list. Now let's assume downloadItems changes and new return type is Observable<Item>. How could I change the code to use RxJava without performing something like blockingGet?


Solution

  • You could use repeatUntil to achieve this:

    var totalItems = 0    
    var id = 0
    Observable.fromCallable {
                downloadItems(id)
            }
            .flatMap {
                list ->
                    totalItems += list.size
                    id = list.last()?.id ?: 0
                    Observable.just(list)
            }
            .repeatUntil({totalItems > n})
            .subscribe({result -> System.out.println(result) })