In RxJava 2 Flowable there are different backpressure strategies, among them the most interesting are:
which are respected throughout whole Rx chain.
In Kotlin there is Flow, which declares that it has backpressure support out-of-the-box. I was able to make Flow to have BUFFER and LATEST strategies by using the following:
.collect { ... }
.collect { ... }
Which is just shortcut on that same buffer operator.
But I was not able to find anything that can work same as DROP. In short DROP will drop any value which comes in the stream when previous value hasn't been processed yet. And with Flow I even not sure that it is possible at all.
Considering the case:
.backpressureDrop() // non-existent operator, just for illustrative purposes
.map { ... }
.flatMapMerge { ... }
.collect { ... }
So backpressureDrop should respect any work which is done below in the stream, while that operator don't know anything about what is happening below (without explicit callback from the bottom - like "request" method in RxJava Subscriber). Therefore it seems not that possible. And that operator should not pass through any event before previous item was collected.
Is there any ready operator, which I miss, or is there a straightforward way to implement something like this with existing API?
We can build this using a Flow backed by a Rendezvous Channel.
When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.
A Rendezvous channel has no buffer. Therefore, consumers of this channel are required to be suspended and waiting for the next element in order for an element to be sent to this channel. We can exploit this quality to drop values that can't be accepted without the channel suspending using Channel.offer
, which is a normal non-suspending function.
Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).
Because channelFlow
is buffered, we need to apply Flow<T>.buffer
downstream to 0.
* Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
* are offered to the underlying [channelFlow]. If the consumer is not currently suspended and
* waiting for the next element, the element is dropped.
* @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
collect { offer(it) }
}.buffer(capacity = 0)
Here's an example of how a slow consumer can use this to drop elements.
fun main() = runBlocking {
flow {
(0..100).forEach {
}.drop().collect {
with the corresponding output: