Search code examples
springapache-kafkaspring-transactionsspring-kafka

Spring Kafka and Transactions


I'd like to use Spring Kafka with Transactions but I don't really understand how it is supposed to be configured and how it works.

Here is my configuration

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ACKS_CONFIG, "all");

This config is used in a DefaultKafkaProducerFactory with transaction id prefix:

defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");

Problem 1 :

How am I supposed to chose this transaction id prefix ? If I understand correctly, this prefix is used by spring to generate a transactional id for each producer created.

Why couldn't we just use "UUID.randomUUID() ?

Problem 2 :

If the producer is destroyed, it will generate a new transactional id. Therefor, if the application crash, on restart it will reuse old transactional id.

Is that normal ???

Problem 3 :

I'm using an application deployed on cloud which can be automatically scaled up/down. This means my prefix can not be fixed since all my producers on each instances will have transactional id in conflict.

Should I add a random part in it ? Do I need to recover the same prefix when an instance is scaled down/up or crash and restart ?

Problem 4 :

Last but not least, we are using credentials for our Kafka. This does not seems to work :

Current ACLs for resource `TransactionalId:my_app*`:
    User:CN... has Allow permission for operations: All from hosts: *

How should I set my ACLs knowing that my transactional ids are generated ?

Edit 1

After further reading, if I understand correctly.

If you have a C0(consumer) reading from P0(partition).If the broker start a consumer rebalance. P0 could be assigned to another consumer C1. This consumer C1 should use the same transaction id than then previous C0 to prevent duplicate (Zombies fencing) ?

How do you achieve this in spring-kafka ? The transaction id seems to have nothing to do with the consumer and thus the partition read.

Thanks


Solution

    1. You can't use a random TID because of the zombie fencing - if the server crashes you could have a partial transaction in the topic which will never be completed and nothing more will be consumed from any partitions with a write for that transaction.

    2. That is by design - for the above reason.

    3. Again, you can't randomize; for the above reason.

    Cloud Foundry, for example, has an environment variable that indicates the instance index. If you are using a cloud platform that doesn't include something like that, you would have to simulate it somehow. Then, use it in the transaction id:

    spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
    
    1. ACLs - I can't answer that; I am not familiar with kafka permissions; might be better to ask a separate question for that.

    2. I think we need to add some logic to Spring to ensure the same transaction id is always used for a particular topic/partition.

    https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929

    EDIT

    Things have changed since this answer (KIP-447); if your brokers are 2.5.0 or later - see. https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-once and https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-once