Search code examples
javaapache-kafkaspring-kafka

Kafka Springboot - send topicName to producer on the fly


I'm writing an application in which I want to send a message into a given topic if it exists and if not, create a new topic and then send my message.

The problem I'm having is I want the new topic name to be based on the data I'm sending into that topic. So I want to be able to pass the topicName to the producer from my service class (below) on the fly.

My config class looks like this:

@EnableKafka
@Configuration
class TimeOnTaskConfig {

    @Bean
    public NewTopic TimeOnTaskQueue() {
        return TopicBuilder.name("testing123")
            .partitions(1)
            .replicas(1)
            .build();
    }

    @Bean
    public Producer<TaskEvent> timeOnTaskProducer(final KafkaTemplate<String, TaskEvent> template) {
        return new Producer<>(
            Producer.Config.builder()
                .topicName("testing123")
                .build(),
            template);
    }

   
    @Bean
    public KafkaTemplate<String, TaskEvent> kafkaTemplate(
        final ProducerFactory<String, TaskEvent> pf) {
        return new KafkaTemplate<>(pf);
    }

but instead of passing in "testing123", I'd like to be able to pass a topicName parameter. When I add in a string parameter though to the timeOnTaskProducer definition, it gives me a Could not autowire. No beans of 'String' type found. error message.

How should I go about sending the topicName as a parameter from my service? My service for reference:

@Service
@Slf4j
public class TimeOnTaskService {

    private final Producer<TaskEvent> taskProducer;

    public TimeOnTaskService(final Producer<TaskEvent> taskProducer) {
        this.taskProducer = taskProducer;
    }

    public void enqueue(final TaskEvent event) {
        try {
//I'd like to be able to do something here like: taskProducer.setTopicName("section" + event.getID());
            taskProducer.send(event);
        } catch (Exception e) {
            log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
                e.getMessage()));
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}

and my producer:

public class Producer<Req> {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final KafkaTemplate<String, Req> template;
    private final Config config;

    public Producer(Config config, KafkaTemplate<String, Req> template) {
        this.config = config;
        this.template = template;
    }

    public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
        logger.info("{}::send()", getClass().getCanonicalName());

        final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(config.topicName, requestStub);
        ListenableFuture future = template.send((ProducerRecord<String, Req>) record);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {

            @Override
            public void onSuccess(SendResult<String, Req> result) {
                logger.info("Sent message=[" + requestStub +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to send message=["
                    + requestStub + "] due to : " + ex.getMessage());
            }
        });
    }


    @Getter
    @Builder
    @ToString
    public static class Config {
        private final String topicName;
        private final int sendTimeout;
    }
}

Solution

  • There is no need to create a new producer for every topic you want to publish to, you can simply use the KafkaTemplate and call send(String topic, T data) to specify the topic instead:

    public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
        logger.info("{}::send()", getClass().getCanonicalName());
    
        final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(topic, requestStub);
        ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
    
        future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
    
            @Override
            public void onSuccess(SendResult<String, Req> result) {
                logger.info("Sent message=[" + requestStub +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
    
            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to send message=["
                    + requestStub + "] due to : " + ex.getMessage());
            }
        });
    }
    
    public void enqueue(final TaskEvent event) {
        try {
            String topicName = "section" + event.getID();
            taskProducer.send(topicName, event);
        } catch (Exception e) {
            log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
                e.getMessage()));
            throw new RuntimeException(e.getMessage(), e);
        }
    }
    
    

    Kafka will automatically create a new topic, (if auto.create.topics.enable=true is enabled, which is the default), but it's really recommended to create topics AOT, so you can control their configurations implicitly