Search code examples
spring-cloud-stream

How to get correlation id in Spring Cloud Stream


Spring Team,

The below producer is able to send the values to a kafka topic successfully.

@Bean
Supplier<Flux<Integer>> someProducer(){
    return () -> Flux.range(1, 10);
}

But..how do we get the correlation id of the message produced as we get using ReactiveKafkaSender? Since the flux is subscribed by Spring internally, Is there any way to get?


Solution

  • Currently, the binder does not support getting the complete SenderResult, only the RecordMetadata of a successful send.

    Please open a bug on GitHub (spring-cloud-stream) and reference this question.

    To get the RecordMetadata you can use Supplier<Flux<Message<Integer>>> and set the senderResult header in the message to an AtomicInteger<Mono<RecordMetadata>>; it will be populated with a Mono which you can subscribe to.

    There's a test here: https://github.com/spring-cloud/spring-cloud-stream/blob/29c3cd7cddf9b853c57fca2b2118f1b64e5dde30/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java#L315-L323

    However, I can see that this is not much use without the correlation metadata.