Search code examples
javaspringspring-kafka

Spring Kafka single producer for multiple topics


I want to use a single producer for writing JSON objects to multiple topics.

The following code is doing what I want but it feels wrong to use the setDefaultTopic() method to tell the KafkaTemplate to which topic it should send the message.

If I use the send(String topic, ? payload) method than the StringJsonMessageConverter won't work.

My Producer:

public class MyProducer {

    @Autowired
    private KafkaTemplate<String, ?> kafka;

    public void send(String topic, Message<?> message) {
        kafka.setDefaultTopic(topic);
        kafka.send(message);
    }
}

And my configuration:

@Configuration
public class MyProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        ...

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());

        return kafkaTemplate;
    }
}

Any suggestions on how to do this properly?

UPDATE

I changed the code to this ...

Producer:

public void send(Message<?> message) {
    kafka.send(message);
}

In my controller (where I create the message objects);

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);

The MessageHeaders object will still contain the id and timestamp.


Solution

  • When using the send(Message<?>) variant, the message converter expects the topic to be in a message header...

    String topic = headers.get(KafkaHeaders.TOPIC, String.class);
    

    If you can determine the topic some other way, you need to create a custom converter.

    Changing the default topic is not thread-safe.