Search code examples
javaapache-kafkaspring-kafkakafka-producer-api

How work maxAge in Spring Kafka Factory Producer?


We are using transactional producers and sometimes we find ourselves in the situation where there is no traffic for more than 7 days, which leads to the loss of transactional id metadata. The next write after this time always causes an error:

org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.

workaround for now is to restart all instances in Kubernetes.

Reviewing the documentation, I see that in version 2.5.8 a new property 'maxAge' has been added, which must be less than 7 days to be able to refresh the metadata on the broker side. I was testing and there is no way to make it work as expected.

I have set up a demo project to simulate the behavior, except for some error on my part, maxAge does not refresh metadata before the time of:

transactional.id.expiration.ms

Versions used:

  1. confluentinc/cp-kafka:latest - Kafka 3.4
  2. Spring Boot 3.1.5 JDK 17
  3. Kafka Clients 3.4.1
  4. Spring Kafka 3.0.12
  5. Spring Stream Binder Kafka 4.0.4

The transaction metada is forced to expire in 10 seconds.

  KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 10000
  KAFKA_PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS: 100
  KAFKA_PRODUCER_ID_EXPIRATION_MS: 10000
  KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
  KAFKA_TRANSACTION_REMOVE_EXPIRED_TRANSACTION_CLEANUP_INTERVAL_MS: 10000

Set the maxAge to 7 seconds, with the expectation that it will refresh before 10 seconds to gain an additional 10 seconds:

@Configuration
@Slf4j
public class KafkaConfig {

    @Bean
    public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
        return producerFactory -> producerFactory.setMaxAge(Duration.ofSeconds(7));
    }

}

Test Controller to producer message on demand

@RestController
public class TestController {

    private AtomicInteger semaphore = new AtomicInteger(0);

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping(value = "/test")
    public void sequentialSimple() {
            streamBridge.send("sendTestData-out-0", "testMessage_" + semaphore.getAndIncrement());
    }

}

I expect to never encounter the InvalidPidMappingException, but I always see it.

2023-11-19T13:22:57.234+01:00 DEBUG 28648 --- [fix_localhost-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-transactionTestPrefix_localhost-0, transactionalId=transactionTestPrefix_localhost-0] Transition from state COMMITTING_TRANSACTION to error state ABORTABLE_ERROR

org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.

If I don't handle the ABORTABLE_ERROR (AfterRollbackProcessor) or DLQ, I also lose messages. What am I doing wrong?

Github Demo Project: https://github.com/Fonexn/kafkaMaxAgeTesting

Thanks

SOLUTION

Set maxAge to each binder in this way, example:

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
    KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka1", MessageChannel.class);
    DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = (DefaultKafkaProducerFactory<byte[], byte[]>) kafka.getTransactionalProducerFactory();

    producerFactory.setMaxAge(Duration.ofSeconds(60));

    return new KafkaTransactionManager(producerFactory);
}

Solution

  • maxAge does not refresh the metadata. When a transactional producer is retrieved from the cache, if the maxAge has been exceeded, the producer is closed and we examine the next one in the cache. This continues until a young producer is found or the cache is empty. If all the producers in the cache are too old, a new producer is created.

    See createTransactionalProducer() and expire().