I'm writing an application in which I want to send a message into a given topic if it exists and if not, create a new topic and then send my message.
The problem I'm having is I want the new topic name to be based on the data I'm sending into that topic. So I want to be able to pass the topicName to the producer from my service class (below) on the fly.
My config class looks like this:
@EnableKafka
@Configuration
class TimeOnTaskConfig {
@Bean
public NewTopic TimeOnTaskQueue() {
return TopicBuilder.name("testing123")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public Producer<TaskEvent> timeOnTaskProducer(final KafkaTemplate<String, TaskEvent> template) {
return new Producer<>(
Producer.Config.builder()
.topicName("testing123")
.build(),
template);
}
@Bean
public KafkaTemplate<String, TaskEvent> kafkaTemplate(
final ProducerFactory<String, TaskEvent> pf) {
return new KafkaTemplate<>(pf);
}
but instead of passing in "testing123", I'd like to be able to pass a topicName parameter. When I add in a string parameter though to the timeOnTaskProducer definition, it gives me a Could not autowire. No beans of 'String' type found.
error message.
How should I go about sending the topicName as a parameter from my service? My service for reference:
@Service
@Slf4j
public class TimeOnTaskService {
private final Producer<TaskEvent> taskProducer;
public TimeOnTaskService(final Producer<TaskEvent> taskProducer) {
this.taskProducer = taskProducer;
}
public void enqueue(final TaskEvent event) {
try {
//I'd like to be able to do something here like: taskProducer.setTopicName("section" + event.getID());
taskProducer.send(event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
}
and my producer:
public class Producer<Req> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final KafkaTemplate<String, Req> template;
private final Config config;
public Producer(Config config, KafkaTemplate<String, Req> template) {
this.config = config;
this.template = template;
}
public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(config.topicName, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=["
+ requestStub + "] due to : " + ex.getMessage());
}
});
}
@Getter
@Builder
@ToString
public static class Config {
private final String topicName;
private final int sendTimeout;
}
}
There is no need to create a new producer for every topic you want to publish to, you can simply use the KafkaTemplate
and call send(String topic, T data)
to specify the topic instead:
public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(topic, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=["
+ requestStub + "] due to : " + ex.getMessage());
}
});
}
public void enqueue(final TaskEvent event) {
try {
String topicName = "section" + event.getID();
taskProducer.send(topicName, event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
Kafka will automatically create a new topic, (if auto.create.topics.enable=true
is enabled, which is the default), but it's really recommended to create topics AOT, so you can control their configurations implicitly