Search code examples
spring-webfluxproject-reactorspring-reactivespring-reactor

Generate elements applying a function to two consecutive elements of a Reactor Flux


I have a Flux that will emit some elements, for the sake of simplicity suppose a Flux<String> -> "1", "2", "3", "n"

What I need to do is get two consecutive elements and apply an operation (e.g. flatMap) that will generate some elements from them. Again let's suppose that the function concats the first element with the second and the second with the first one:

f(x,y) -> "xy", "yx"

So the final sequence emitted from the original Flux should be:

"12" - "21" - "23" - "32" - "3n" - "n3"

How could this be done?


Solution

  • Solved using buffer(2, 1):

    fun main() {
        val flux = listOf("1", "2", "3", "4").toFlux()
        flux.buffer(2, 1)
                .flatMap {
                    if (it.size == 2) {
                        listOf(it[0] + it[1], it[1] + it[0]).toFlux()
                    } else {
                        Flux.empty()
                    }
                }.doOnNext { println(it) }
                .subscribe()
    }