Search code examples
kotlinrx-swiftrx-kotlin

Kotlin RX .zipWith function body executes once or once per observer?


When data is transmitted to my app, it follows a sequence of:

  • 1 ReadStart packet with ID information
  • 1 or more DataPackets that are combined to form the payload
  • 1 ReadDone packet to signal the transfer is done

I have a Kotlin RX function that creates an Observable:

val readStartPublishProcessor: PublishProcessor<ReadStartPacket>
val dataPacketPublishProcessor: PublishProcessor<DataPacket>
val readDonePublishProcessor: PublishProcessor<ReadDonePacket>
...

private fun setupReadListener(): Flowable<ReadEvent> {
    val dataFlowable = dataPacketPublishProcessor.buffer(readDonePublishProcessor)
    
    return readStartPublishProcessor
        .zipWith(other = dataFlowable) { readStart, dataPackets ->
            Log.d(tag, "ReadEvent with ${dataPackets.size} data packets")
            ReadEvent(event = readStart, payload = combinePackets(dataPackets))
        }
}

From reading the documentation of .zipWith, I expect the .zipWith function body to execute once for each pair of values emitted from the readStartPublishProcessor and dataFlowable, and then pass that computed result to each subscriber:

The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. ... It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items.

But if I have more than 1 observer I see the .zipWith function body executed the same number of times as the number of observers, each time with the same pair of emitted values. This is a problem because of side effects in the function called from within the .zipWith function body. (Note: neither .share nor .replay operators are not used in the observers.)

Why does is seem to be running the .zipWith function body for each observer rather than just once, and is there a way to write this so it only executes once regardless of the number of observers?


Solution

  • A couple of points...

    The function called from within zipWith should not contain side effects. It should be a pure function. If you absolutely need side effects there, then use one of the do operators.

    The observable returned from zipWith is cold (Observables are cold by default,) this means that every observer gets its own execution context. I.E., that the operator subscribes to its source observables every time that it is subscribed to and calls the function block it has for each subscription.

    If you want subscriptions to share an execution context, then you must use the share or refCount operator. Learn more about Hot and Cold Observables here.