Search code examples
spring-bootapache-kafkaspring-kafkakafka-producer-api

Multiple Kafka Producer Instance for each Http Request


I have a rest end point which can be invoked by multiple users at same time. This rest end point invokes a Transactional Kafka Producer. What I understand is I cant use same Kafka Producer instance at same time if we use Transaction.

How can I create a new Kafka Producer Instance for each HTTP request efficiently ?

//Kafka Transaction enabled
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" );

@Service
public class ProducerService {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    public void postMessage(final MyUser message) {
        // wrapping the send method in a transaction
        this.kafkaTemplate.executeInTransaction(kafkaTemplate -> {
              kafkaTemplate.send("custom", null, message);
        }

    }

Solution

  • See the javadocs for the DefaultKafkaProducerFactory. It maintains a cache of producers for producer-initiated transactions.

    /**
     * The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
     * <p>
     * This implementation will return the same {@link Producer} instance (if transactions are
     * not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
     * implementations on each {@link #createProducer()} invocation.
    
    ...
    
     * Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
     * cache of producers is maintained; closing a producer returns it to the cache. The
     * producers are closed and the cache is cleared when the factory is destroyed, the
     * application context stopped, or the {@link #reset()} method is called.
    
    ...
    
     */