Spring Team,
The below producer is able to send the values to a kafka topic successfully.
@Bean
Supplier<Flux<Integer>> someProducer(){
return () -> Flux.range(1, 10);
}
But..how do we get the correlation id of the message produced as we get using ReactiveKafkaSender? Since the flux is subscribed by Spring internally, Is there any way to get?
Currently, the binder does not support getting the complete SenderResult
, only the RecordMetadata
of a successful send.
Please open a bug on GitHub (spring-cloud-stream
) and reference this question.
To get the RecordMetadata
you can use Supplier<Flux<Message<Integer>>>
and set the senderResult
header in the message to an AtomicInteger<Mono<RecordMetadata>>
; it will be populated with a Mono which you can subscribe to.
However, I can see that this is not much use without the correlation metadata.