I have a rest end point which can be invoked by multiple users at same time. This rest end point invokes a Transactional Kafka Producer. What I understand is I cant use same Kafka Producer instance at same time if we use Transaction.
How can I create a new Kafka Producer Instance for each HTTP request efficiently ?
//Kafka Transaction enabled
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" );
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
public void postMessage(final MyUser message) {
// wrapping the send method in a transaction
this.kafkaTemplate.executeInTransaction(kafkaTemplate -> {
kafkaTemplate.send("custom", null, message);
}
}
See the javadocs for the DefaultKafkaProducerFactory
. It maintains a cache of producers for producer-initiated transactions.
/**
* The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
* <p>
* This implementation will return the same {@link Producer} instance (if transactions are
* not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
* implementations on each {@link #createProducer()} invocation.
...
* Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
* cache of producers is maintained; closing a producer returns it to the cache. The
* producers are closed and the cache is cleared when the factory is destroyed, the
* application context stopped, or the {@link #reset()} method is called.
...
*/