Search code examples
kotlinrx-javarx-kotlinrx-kotlin2rx-javafx

RxJava/RxKotlin split stream depending on subtype


I have a stream of ResponseMessage which can be of different subtypes. I would like to split the stream into streams where I can handle each type in its own stream.

My first try resulted in this which I can not see working out.

file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .groupBy { when(it) {
            is MarketChangeMessage -> it::class
            else -> it::class
        }}
        .map { it.????? } //How can possible this work?

My question is now: What is the idiomatic way to divide a stream into streams on one specific sub type?


Solution

  • You could use the ofType operator:

    ofType( ) — emit only those items from the source Observable that are of a particular class.

    Example:

    val messages = file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .share() // <-- or other multicasting operator
    
    messages
        .ofType(MarketChangeMessage::class)
        .subscribe()
    
    messages
        .ofType(Other::class)
        .subscribe()