I want to use Spring Cloud Stream to produce keyed (message with specific key) messages to Kafka.
@SpringBootApplication
public class SpringCloudStreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
}
@Bean
Supplier<DataRecord> process(){
return () -> new DataRecord(42L);
}
}
What do I need to change in the Supplier code to provide key? Is it possible in new style of API (using lambdas)?
Thank you
Return a Message<?>
and set the KafkaHeaders.MESSAGE_KEY
header:
@Bean
Supplier<Message<String>> process() {
return () -> MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
}
(assumes the default key serializer (byte[]).
EDIT
This will be called endlessly.
If you want to send a finite stream, I believe you have to switch to the reactive model.
@Bean
Supplier<Flux<Message<String>>> processFinite() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
Message<String> msg2 = MessageBuilder.withPayload("baz")
.setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
.build();
return () -> {
return Flux.just(msg1, msg2);
};
}
There is also Flux.fromStream(myStream)
.
Which will end at the end of the stream.
EDIT2
You can also use the StreamBridge
.