Search code examples
spring-webfluxspring-cloud-streamspring-cloud-stream-binder-kafka

Is there a way to dynamically change destination in Spring Cloud Stream Reactive method?


I know that if I implement SC Stream Function in Spring reactive way, I can't send DLQ like traditional imperative way.

To do this, I am trying to manually change the Destination in the MessageHeader so that the message goes to the DLQ Topic.

However, if an error occurs, I want to send a message to the zombie Topic using the onErrorContinue method, but that doesn't work.

Shouldn't I produce it in onErrorContinue?

input.flatMap { sellerDto ->
            // do something..
        }.onErrorContinue { throwable, source ->
            log.error("error occured!! ${throwable.message}")
            source as Message<*>
            Flux.just(MessageBuilder.withPayload(String(source.payload as ByteArray)).setHeader("spring.cloud.stream.sendto.destination","zombie").build())
        }

Solution

  • No, since with reactive function the unit of work is the entire stream (not individual item). I provide more details here.

    That said, you can inject StreamBridge which is always available as a bean and use it inside of your filter or any other applicable reactive operator. Basically streamBridge.send("destination", Message)