I have a chain where I do some blocking IO calls (e.g. HTTP-call). I want the blocking call to consume a value, proceed without interrupting, but drop everything that is piling up meanwhile, and then consume the next value in the same manner.
Consider the following example:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
}.blockingForEach { println(it) }
From a naive point of view, I would it expect to print something like 0, 10, 20, ...
, but it prints 0, 1, 2, ...
What am I doing wrong?
I thought about naively adding debounce
to eat up the incoming stream:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
.blockingForEach { println(it) }
But, now I get a java.lang.InterruptedException: sleep interrupted
What seems to work is the following:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
.blockingForEach { println(it) }
The output is as expected 0, 10, 20, ...
Is that the correct way?
I noted that throttleLast
will switch to the Computation-Scheduler. Is there a way to go back to the original scheduler?
I also get an occasional java.lang.InterruptedException: sleep interrupted
with that variant.
The most simple approach to solve the problem is:
fun <T> Flowable<T>.lossy() : Flowable<T> {
return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
By calling lossy
on a Flowable
it starts to drop all element that are coming in faster than the downstream consumer can process.