Search code examples
rx-java2rx-kotlin2

How can you create a timer that works on a List in Rx?


I want to look for an entire list of items to be found before I complete and if that entire list isn't found, then an exception (a Timeout or custom one) is to be thrown. Like the built in Observable.timer() but instead of the test passing once the first item is emitted, I want it to require all of the items in a list to be found.

Here is an example. Let's say I have some test function that emits Observable<FoundNumber>. It looks like this:

var emittedList: List<String?> = listOf(null, "202", "302", "400")

data class FoundNumber(val numberId: String?)

fun scanNumbers(): Observable<FoundNumber> = Observable
    .intervalRange(0, 
                   emittedList.size.toLong(), 
                   0, 
                   1, 
                   TimeUnit.SECONDS).map { index -> 
                     FoundNumber(emittedList[index.toInt()]) }

That function will then be called to get numbers that will be compared to a list of expected numbers. It doesn't matter if there are additional numbers coming from scanForNumbers that aren't in the "target" list. They will just be ignored. Something like this:

val expectedNumbers = listOf("202", "302","999")

        scanForNumbers(expectedNumbers)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe { value -> Log.d(TAG, "Was returned a $value") }

So, the expected numbers (202, 302, and 999) don't exactly match with the numbers that will be emitted (202, 302, and 400). So, a timeout SHOULD occur, but with the built in version of Observable.timer(), it will not time out since at least one item was observed.

Here is kind of what I'd like to have. Anyone know how to code this up in RxJava/RxKotlin?

fun scanForNumbers(targets: List<String>): Observable<FoundNumber> {
  val accumulator: Pair<Set<Any>, FoundNumber?> = targets.toSet() to null
    return scanNumbers()
        .SPECIAL_TIMEOUT_FOR_LIST(5, TimeUnit.SECONDS, List)
        .scan(accumulator) { acc, next ->
            val (set, previous) = acc
            val stringSet:MutableSet<String> = hashSetOf()

            set.forEach { stringSet.add(it.toString()) }

            val item = if (next.numberId in stringSet) {
                next
            } else null
            (set - next) to item       // return set and nullable item
        }
        .filter { Log.d(TAG, "Filtering on ${it.second}")
                  it.second != null }  // item not null
        .take(targets.size.toLong())         // limit to the number of items
        .map { it.second }                   // unwrap the item from the pair
        .map { FoundController(it.numberId) }  // wrap in your class
}

How do you code, hopefully using RxJava/Kotlin, a means to timeout on a list as mentioned?


Solution

  • I think I get it now, you want the timeout to begin counting from the moment you subscribe, not after you observe items.

    If this is what you need, then the takeUntil operator could help you:

    return scanNumbers()
        .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
        .scan(accumulator) { acc, next -> ...
    

    In this case, the timer will begin counting as soon as you subscribe. If the main observable completes before then great, if not, then the timer will complete the main observable anyways.

    But takeUntil by itself will not throw an error, it will just complete. If you need it to end with an error, then you could use the following combination:

    return scanNumbers()
        .takeUntil(
                Observable
                        .error<Void>(new TimeoutError("timeout!"))
                        .delay(5, TimeUnit.SECONDS, true))
        .scan(accumulator) { acc, next -> ...