Search code examples
rx-java2rx-kotlin

Checking a list and timing out if all entries not found in RxJava/RxKotlin


I have a scenario where I have a function, scanForTargets, that returns an Observable of type FoundNumber. In FoundNumber I just need an ID field I can grab out of it. As each element comes back in the scanResults Observable, I want to check to see if the name field matches one of the names on a target list. If so, then I want to emit that. For example, if I am looking for numbers 1, and 2, and scanForTargets() emits back 1, 2, 3, and 4, then I want scanForValues to emit back only 1 and 2.

The caveat is that I only want to continue doing this until either: 1) A time period elapses (in which case I throw and error) 2) All items on the String list are found before the timeout.

What I have so far looks like this, but I cannot get it to work for me mostly due to the shortcut of stopping once/if all of the targets are found before the timeout.

fun scanForValues(targetList: List<String>): Observable<FoundNumber> {
    val scanResult = scanForTargets()

    return scanResult.doOnNext {scanResult -> Log.d(TAG, "Found potential target: " + scanResult.name) }
            .filter(TargetPredicate(targetList)) //See if it's one of those we want
            .timeout(5, TimeUnit.SECONDS) //Wait a max of 5 seconds to find all items
            .doOnError { Log.w(TAG, "Failed to scan"}") }
            .map{s->scanResult.name}  
}

class TargetPredicate(private val targetList: List<String>) : Predicate<ScanResult> { override fun test(scanResult: ScanResult): Boolean {
        if(scanResult == null) {
            return false
        }
        return scanResult.name in targetList 
    }
}

How can I also add the check to stop if I find all of the items in the list? I can't just add another predicate right?

Thanks.

Update: As requested, here is some data to show what I mean.

Let's say that the scanForTargets() and supporting code looks like this:

var emittedList: List<String?> = listOf(null, "0", "1", "2", "3")


fun scanForTargets(): Observable<FoundNumber> = Observable
    .intervalRange(0, emittedList.size.toLong(), 0, 1, TimeUnit.SECONDS)
    .map { index -> FoundNumber(emittedList[index.toInt()]) }

data class FoundNumber(val targetId: String?)

Now if scanForValues was called with a list of 1 and 2, then it should emit back an Observable of 1 and then 2.


Solution

  • No, it is not as simple as adding another filter.

    A possible solution is to use scan to remove items from a set containing your targets, and complete when the set becomes empty.

    Example:

    val targets = listOf("a", "b", "c")
    
    fun scanForTarget(): Observable<String> = Observable.just("a", "b")
    
    fun scanForValues(targets: List<String>): Completable {
        val initial = targets.toMutableSet()
        return scanForTarget()
                .timeout(5, TimeUnit.SECONDS)
                .scan(initial) { acc, next -> acc.remove(next); acc }
                .filter { it.isEmpty() }
                .singleOrError()
                .toCompletable()
    }
    

    Note: a Completable is a special type of publisher that can only signal onComplete or onError.


    Update: response to question update.

    The new example in your question won't work, because null values are not allowed in RxJava2.

    Assuming you fix that, the following solution may help you.

    fun scanForValues(targets: List<String>): Observable<String> {
        val accumulator: Pair<Set<String>, String?> = targets.toSet() to null
        return scanForTarget()
                .timeout(5, TimeUnit.SECONDS)
                .scan(accumulator) { acc, next -> 
                    val (set, previous) = acc
                    val item = if (next in set) next else null
                    (set - next) to item     // return set and nullable item
                }
                .filter { it.second != null } // item not null
                .take(initial.size)           // limit to the number of items
                .map { it.second }            // unwrap the item from the pair
                .map { FoundNumber(it) }      // wrap in your class
    }
    

    Instead of using only the Set<String> as the accumulator, now we also add the item.

    The item is nullable, this allows us to check if a given item was present or not.

    Notice that no null values are passed through the observable flow. In this case null values are wrapped inside Pair<Set<String>, String?> which are never null themselves.