Search code examples

How to get correlation id in Spring Cloud Stream

Spring Team,

The below producer is able to send the values to a kafka topic successfully.

Supplier<Flux<Integer>> someProducer(){
    return () -> Flux.range(1, 10);
} do we get the correlation id of the message produced as we get using ReactiveKafkaSender? Since the flux is subscribed by Spring internally, Is there any way to get?


  • Currently, the binder does not support getting the complete SenderResult, only the RecordMetadata of a successful send.

    Please open a bug on GitHub (spring-cloud-stream) and reference this question.

    To get the RecordMetadata you can use Supplier<Flux<Message<Integer>>> and set the senderResult header in the message to an AtomicInteger<Mono<RecordMetadata>>; it will be populated with a Mono which you can subscribe to.

    There's a test here:

    However, I can see that this is not much use without the correlation metadata.