Search code examples
androidkotlinrx-javakotlin-coroutines

PublishSubject with Kotlin coroutines (Flow)


I used a PublishSubject and I was sending messages to it and also I was listening for results. It worked flawlessly, but now I'm not sure how to do the same thing with Kotlin's coroutines (flows or channels).

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe { /* value received */ }
}

Since I need the debounce operator I really wanted to do the same thing with flows so I created a channel and then I tried to create a flow from that channel and listen to changes, but I'm not getting any results.

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

What is wrong?


Solution

  • Flow is a cold asynchronous stream, just like an Observable.

    All transformations on the flow, such as map and filter do not trigger flow collection or execution, only terminal operators (e.g. single) do trigger it.

    The onEach method is just a transformation. Therefore you should replace it with the terminal flow operator collect. Also you could use a BroadcastChannel to have cleaner code:

    private val channel = BroadcastChannel<Boolean>(1)
    
    suspend fun someMethod(b: Boolean) {
        channel.send(b)
    }
    
    suspend fun observe() {
      channel
        .asFlow()
        .debounce(500)
        .collect {
            // value received
        }
    }
    

    Update: At the time the question was asked there was an overload of debounce with two parameters (like in the question). There is not anymore. But now there is one which takes one argument in milliseconds (Long).