Search code examples
kotlinkotlin-coroutineskotlinx.coroutines.flowkotlinx.coroutines.channels

How to combine Kotlin flows only when the first is emitting a (not null) value, unsubscribing the second one if it is not?


What I'm trying to achieve is that flowB is just (re)subscribed when flowA emits a value different of null. But there is no standard way to do that (as I can see).

In my scenario, flowB is expensive to keep emitting values without need, so what I need is a way to unsubscribe from it when flowA is equal to null, but resubscribe when it has a (not null) value.

If I use something like takeWhile (testing for null values) before combine, what happens is that flowA completes in the first negative result and it will not receive updates anymore.

With filterNotNull I cannot achieve what I'm trying to do either, because the streaming will keep running with previous values (it will not stop when a null is received).

Anyone has any suggestions on how to achieve that with the standard operators?

Or it should be done as a new (custom) operator (like combineWhenNotNull), for instance? Any tips on how to achieve this behavior?

flowA
  .combine(flowB) { a, b ->
    if (a != null) {
      Log.d(TAG, "$a -> $b")
    }
  }

This is the base logic of the streaming, but this way flowB keeps emitting values, even when flowA is emitting null (that is what I want to avoid).


Solution

  • flatMapLatest

    flowA
        .flatMapLatest { a ->
            if (a == null) {
                emptyFlow()
            } else {
                flowB.onEach { b -> Log.d(TAG, "$a -> $b") }
            }
        }