Search code examples
spring-kafka

How is the DefaultKafkaProducerFactory cache managed for transactions?


In the spring kafka documentation https://docs.spring.io/spring-kafka/docs/2.3.3.RELEASE/reference/html/#transactions

It mentions;

Transactions are enabled by providing the DefaultKafkaProducerFactory with a transactionIdPrefix. In that case, instead of managing a single shared Producer, the factory maintains a cache of transactional producers. When the user calls close() on a producer, it is returned to the cache for reuse instead of actually being closed. The transactional.id property of each producer is transactionIdPrefix + n

  • How is this cache configured e.g. producer pool size?
  • Does it dynamically create a new producer when there isn't any available producers from the cache for the given transaction?

Solution

  • It depends on whether the transaction is producer only and the producerPerConsumerPartition which is true by default (for consumer initiated transactions).

    This property is to support EOSMode.ALPHA (or fallback to ALPHA when BETA is used but the broker is older than 2.5).

    See here for more information about exactly once semantics.

    When using producerPerConsumerPartition=false and for producer-only transactions, there is no limit to the cache size; new producers are created when the cache is empty, and returned to the cache when "closed".