Search code examples
spring-bootspring-kafkaspring-cloud-stream

How to send keyed message to Kafka using Spring Cloud Stream Supplier


I want to use Spring Cloud Stream to produce keyed (message with specific key) messages to Kafka.

@SpringBootApplication
public class SpringCloudStreamKafkaApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
  }

  @Bean
  Supplier<DataRecord> process(){
    return () -> new DataRecord(42L);
  }

}

What do I need to change in the Supplier code to provide key? Is it possible in new style of API (using lambdas)?

Thank you


Solution

  • Return a Message<?> and set the KafkaHeaders.MESSAGE_KEY header:

    @Bean
    Supplier<Message<String>> process() {
        return () -> MessageBuilder.withPayload("foo")
                .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
                .build();
    }
    

    (assumes the default key serializer (byte[]).

    EDIT

    This will be called endlessly.

    If you want to send a finite stream, I believe you have to switch to the reactive model.

    @Bean
    Supplier<Flux<Message<String>>> processFinite() {
        Message<String> msg1 = MessageBuilder.withPayload("foo")
                .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
                .build();
        Message<String> msg2 = MessageBuilder.withPayload("baz")
                .setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
                .build();
        return () -> {
            return Flux.just(msg1, msg2);
        };
    }
    

    There is also Flux.fromStream(myStream).

    Which will end at the end of the stream.

    EDIT2

    You can also use the StreamBridge.

    https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources