I'm using WebClient to consume an infinite stream of events. This code seemed to work fine in tests:
webClient.get()
.uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(EventStreamBatch::class.java)
.retry()
.subscribe(
{ batches ->
eventsConsumer.accept(batches.events.map { it.toInternalEvent() })
},
{ error ->
log.error(error)
}
)
Now I also want to extract a header from the response which I need to use while consuming the events. I tried to change bodyToFlux()
to toEntityFlux()
so I can get both the headers and the body. But I'm confused how to consume the body stream, while getting the header value at the same time.
...
.retrieve()
.toEntityFlux(EventStreamBatch::class.java)
.subscribe { entity: ResponseEntity<Flux<EventStreamBatch>> ->
val header = entity.headers["my-header"]?.firstOrNull()
// How to consume the `entity.body` which is of type Flux<EventStreamBatch> ?
// Create another `.subscribe()` here? Nested subscribe() seems incorrect, there must be a better way.
}
Not sure if it's a good solution, but I ended up creating a flux of pair of body and the header, using flatMapMany()
:
webClient.get()
.uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.toEntityFlux(EventStreamBatch::class.java)
.flatMapMany { entity ->
val header = entity.headers["my-header"]?.firstOrNull()
return@flatMapMany when (entity.body) {
null -> Flux.empty<Pair<EventStreamBatch, String>>()
else -> entity.body!!.map { Pair(it, header) }
}
}
.subscribe { (batch, header) ->
// have both the body and the header here now
}