Search code examples
javakotlinrx-javarx-java2

Rx Java: Sleep before OnNext (Sleep before emitting from Observable)


Basis a condition in my Observable, I want to delay onNext / onError. My code is as follows:

 fun check3(){
        val list = arrayListOf(1,2,3,4,5,6,7, null)
        val obs = Observable.create<Int> { subscriber ->
           list.filter {
                it != null
            }.map {
                if (it!! %2 == 0 ) {
                    Thread.sleep(3000)
                    subscriber.onError(IllegalArgumentException("Mod is true"))
                } else {
                    subscriber.onNext(it)
                    subscriber.onComplete()
                }
            }
        }
    }

A sore here being Thread.sleep(3000)

Is there a better way of doing this? Basically I want to delay the onError notification to my subscriber if the if(it %2) condition is met


Solution

  • You can use concatMap to turn the sleep into a non-blocking delay:

    Observable.fromIterable(list.filter { it != null })
    .concatMap {
        if (it!! % 2 == 0) {
            return@concatMap Observable.error(IllegalArgumentException("Mod is true"))
                             .delay(3, TimeUnit.SECONDS, true)
        }
        Observable.just(it)
    }
    .take(1)