Search code examples
spring-bootapache-kafkaspring-kafka

Can one Kafka Producer Class have multiple @EventListener methods?


Im using Spring Kafka and wrote Producer Class

@Component
@RequiredArgsConstructor
class Producer {

    private static final String TOPIC = "channels";
    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @EventListener(ApplicationStartedEvent.class)
    public void channels_01() throws IOException {

    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_02()throws IOException {
        
    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_03()throws IOException {
        
    }
}

Is it possible to run 3 @Eventlistener annotated methods as producer simutaneouly? 3 methods are sending records to exactly same topic. Will spring container and kafka server recognize them as 3 separate producer clients?


Solution

  • If you want to have three producer clients, then you need to have three KafkaTemplate and use a producerPerThread = true option of the DefaultKafkaProducerFactory:

    /**
     * Set to true to create a producer per thread instead of singleton that is shared by
     * all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
     * physically close the producer when it is no longer needed. These producers will not
     * be closed by {@link #destroy()} or {@link #reset()}.
     * @param producerPerThread true for a producer per thread.
     * @since 2.3
     * @see #closeThreadBoundProducer()
     */
    public void setProducerPerThread(boolean producerPerThread) {
    

    Then you need to ensure that ApplicationEventMulticaster is asynchronous. See SimpleApplicationEventMulticaster:

    /**
     * Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
     * to invoke each listener with.
     * <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
     * executing all listeners synchronously in the calling thread.
     * <p>Consider specifying an asynchronous task executor here to not block the
     * caller until all listeners have been executed. However, note that asynchronous
     * execution will not participate in the caller's thread context (class loader,
     * transaction association) unless the TaskExecutor explicitly supports this.
     * @see org.springframework.core.task.SyncTaskExecutor
     * @see org.springframework.core.task.SimpleAsyncTaskExecutor
     */
    public void setTaskExecutor(@Nullable Executor taskExecutor) {
    

    Register a bean with name applicationEventMulticaster for it and set a desired TaskExecutor to ensure that your @EventListener methods are called in parallel.