I know that if I implement SC Stream Function in Spring reactive way, I can't send DLQ like traditional imperative way.
To do this, I am trying to manually change the Destination in the MessageHeader so that the message goes to the DLQ Topic.
However, if an error occurs, I want to send a message to the zombie Topic using the onErrorContinue method, but that doesn't work.
Shouldn't I produce it in onErrorContinue?
input.flatMap { sellerDto ->
// do something..
}.onErrorContinue { throwable, source ->
log.error("error occured!! ${throwable.message}")
source as Message<*>
Flux.just(MessageBuilder.withPayload(String(source.payload as ByteArray)).setHeader("spring.cloud.stream.sendto.destination","zombie").build())
}
No, since with reactive function the unit of work is the entire stream (not individual item). I provide more details here.
That said, you can inject StreamBridge
which is always available as a bean and use it inside of your filter
or any other applicable reactive operator. Basically streamBridge.send("destination", Message)