I have a cold Flowable made with the generate method. I want to keep it cold but also want to "throttle" requests for additional items from it by the consumer. For example, if the consumer (subscriber?) takes 400ms to process the item, and throttling is set to 1 second I would expect to see a time line something like this:
0 - generate callback called to generate next value
1 - consumer starts processing generated value
401 - consumer finished processing value
401 - consumer requests next item
1000 - generate callback called to generate next value
This is the sample code I'm playing with to figure it out:
val startTime = System.currentTimeMillis()
fun log(msg: String) {
println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}
val generator = Flowable.generate<Int, Int>(
Callable { 0 },
BiFunction { state, emitter ->
val value = state + 1
log("generating $value")
emitter.onNext(value)
return@BiFunction value
})
val subscription = generator
.concatMap { Flowable.concat(Flowable.just(it), Flowable.empty<Int>().delay(1, TimeUnit.SECONDS)) }
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io(), true, 1)
.subscribe {
log("processing $it")
if (it % 5 == 0) {
log("starting sleep")
try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
log("done sleeping")
}
}
Thread.sleep(8_000)
subscription.dispose()
log("done")
The output I get currently is this:
RxComputationThreadPool-1 - 278 - generating 1
RxCachedThreadScheduler-1 - 283 - processing 1
RxComputationThreadPool-1 - 285 - generating 2
RxComputationThreadPool-2 - 1287 - generating 3
RxComputationThreadPool-2 - 1288 - generating 4
RxCachedThreadScheduler-1 - 1288 - processing 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-4 - 3288 - generating 5
RxComputationThreadPool-4 - 3288 - generating 6
RxCachedThreadScheduler-1 - 3288 - processing 4
RxCachedThreadScheduler-1 - 4289 - processing 5
RxCachedThreadScheduler-1 - 4289 - starting sleep
RxComputationThreadPool-6 - 5290 - generating 7
RxComputationThreadPool-6 - 5290 - generating 8
RxCachedThreadScheduler-1 - 6489 - done sleeping
RxCachedThreadScheduler-1 - 6489 - processing 6
RxCachedThreadScheduler-1 - 7490 - processing 7
main - 8265 - done
The output I want to get is something this:
RxComputationThreadPool-1 - 278 - generating 1
RxCachedThreadScheduler-1 - 283 - processing 1
RxComputationThreadPool-1 - 1285 - generating 2 // 2 generated one second after 1
RxCachedThreadScheduler-2 - 1287 - processing 2 // but once generated, processed immediately
RxComputationThreadPool-2 - 2288 - generating 3 // 3 generated one second after 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-1 - 3289 - generating 4 // 4 generated one second after 3
RxCachedThreadScheduler-4 - 3289 - processing 4
RxComputationThreadPool-4 - 4290 - generating 5 // 5 generated one second after 4
RxCachedThreadScheduler-1 - 4290 - processing 5
RxCachedThreadScheduler-1 - 4290 - starting sleep // item 5 takes longer to process
RxCachedThreadScheduler-1 - 6491 - done sleeping // 2200ms later its done
RxComputationThreadPool-6 - 6492 - generating 6 // now that consumer is done, it requests next item and gets generated immediately since it has been 1 second since last request
RxCachedThreadScheduler-1 - 6492 - processing 6
RxComputationThreadPool-1 - 7492 - generating 7 // 7 generated one second after 6
RxCachedThreadScheduler-4 - 7493 - processing 7
main - 8265 - done
One of the RxJavaExtensions (RxJava2 version) transformers should be a solution to your problem – spanout()
. It inserts delay between emissions from upstream. I have changed just one line in your code (replaced concatMap()
by spanout()
):
val startTime = System.currentTimeMillis()
fun log(msg: String) {
println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}
val generator = Flowable.generate<Int, Int>(
Callable { 0 },
BiFunction { state, emitter ->
val value = state + 1
log("generating $value")
emitter.onNext(value)
return@BiFunction value
})
val subscription = generator
.compose(FlowableTransformers.spanout(1, 1, TimeUnit.SECONDS)) // <– changed line
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io(), true, 1)
.subscribe {
log("processing $it")
if (it % 5 == 0) {
log("starting sleep")
try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
log("done sleeping")
}
}
Thread.sleep(8_000)
subscription.dispose()
log("done")
Produced output:
RxComputationThreadPool-1 - 153 - generating 1
RxCachedThreadScheduler-1 - 1178 - processing 1
RxComputationThreadPool-1 - 1179 - generating 2
RxCachedThreadScheduler-1 - 2176 - processing 2
RxComputationThreadPool-1 - 2177 - generating 3
RxCachedThreadScheduler-1 - 3177 - processing 3
RxComputationThreadPool-1 - 3178 - generating 4
RxCachedThreadScheduler-1 - 4175 - processing 4
RxComputationThreadPool-1 - 4175 - generating 5
RxCachedThreadScheduler-1 - 5177 - processing 5
RxCachedThreadScheduler-1 - 5178 - starting sleep
RxCachedThreadScheduler-1 - 7383 - done sleeping
RxComputationThreadPool-1 - 7384 - generating 6
RxCachedThreadScheduler-1 - 7384 - processing 6
RxComputationThreadPool-1 - 7385 - generating 7
main - 8151 - done