Search code examples
kotlinrx-java2kotlinx.coroutines

Streams of data with only Kotlin Coroutines?


With RxJava Ive become accustomed to my repositories returning Observables of data which automatically update whenever theres an underlying change. I acheive this by simply having a subject in my repository that gets notified with the relevant change info, and observables like getAll() go off of that.

As an example, take this psuedo code like snippet:

fun getAll(): Observable<List<Model> {
    subject
        .filter { isChangeRelevant(it) }
        .startWith(initialChangeEvent)
        .map { queryAll() }
}

Ive been curious about how and if the same thing can be acheived using coroutines only?


Solution

  • You can use Kotlin Coroutines Channels.

    If you only want your values to be emitted like a stream (so you can for-each off of it) you can use produce to create them (which returns a ReceiveChannel):

    fun test(): ReceiveChannel<Int>{
            return produce {
                send(1)
                send(5)
                send(100)
            }
        }
    

    You can use a for-each (or consumeEach) on the values of test() to receive its values.

    If you want your channel to be exactly like RxJava's PublishSubject, you can use ConflatedBroadCastChannel, and emit values to it:

    val broadCastChannel = ConflatedBroadcastChannel<Int>()
    

    You can use broadCastChannel.offer(value) to send values to the channel.

    To receive values from the channel you can use a simple for-each loop:

    for (i in broadCastChannel.openSubscription()) {
           //your values
    }