Search code examples
spring-bootkotlineventsspring-webfluxspring-webclient

How to extract header from webflux while consuming the event stream body?


I'm using WebClient to consume an infinite stream of events. This code seemed to work fine in tests:

webClient.get()
    .uri("/events")
    .accept(MediaType.TEXT_EVENT_STREAM)
    .retrieve()
    .bodyToFlux(EventStreamBatch::class.java)
    .retry()
    .subscribe(
        { batches ->
            eventsConsumer.accept(batches.events.map { it.toInternalEvent() })
        },
        { error ->
            log.error(error)
        }
    )

Now I also want to extract a header from the response which I need to use while consuming the events. I tried to change bodyToFlux() to toEntityFlux() so I can get both the headers and the body. But I'm confused how to consume the body stream, while getting the header value at the same time.

...
    .retrieve()
    .toEntityFlux(EventStreamBatch::class.java)
    .subscribe { entity: ResponseEntity<Flux<EventStreamBatch>> ->

        val header = entity.headers["my-header"]?.firstOrNull()

        // How to consume the `entity.body` which is of type Flux<EventStreamBatch> ?
        // Create another `.subscribe()` here? Nested subscribe() seems incorrect, there must be a better way.
    }

Solution

  • Not sure if it's a good solution, but I ended up creating a flux of pair of body and the header, using flatMapMany():

    webClient.get()
        .uri("/events")
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .toEntityFlux(EventStreamBatch::class.java)
        .flatMapMany { entity ->
            val header = entity.headers["my-header"]?.firstOrNull()
            return@flatMapMany when (entity.body) {
                null -> Flux.empty<Pair<EventStreamBatch, String>>()
                else -> entity.body!!.map { Pair(it, header) }
            }
        }
        .subscribe { (batch, header) ->
            // have both the body and the header here now
        }