I'd like to use Spring Kafka with Transactions but I don't really understand how it is supposed to be configured and how it works.
Here is my configuration
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
This config is used in a DefaultKafkaProducerFactory with transaction id prefix:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
Problem 1 :
How am I supposed to chose this transaction id prefix ? If I understand correctly, this prefix is used by spring to generate a transactional id for each producer created.
Why couldn't we just use "UUID.randomUUID() ?
Problem 2 :
If the producer is destroyed, it will generate a new transactional id. Therefor, if the application crash, on restart it will reuse old transactional id.
Is that normal ???
Problem 3 :
I'm using an application deployed on cloud which can be automatically scaled up/down. This means my prefix can not be fixed since all my producers on each instances will have transactional id in conflict.
Should I add a random part in it ? Do I need to recover the same prefix when an instance is scaled down/up or crash and restart ?
Problem 4 :
Last but not least, we are using credentials for our Kafka. This does not seems to work :
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
How should I set my ACLs knowing that my transactional ids are generated ?
Edit 1
After further reading, if I understand correctly.
If you have a C0(consumer) reading from P0(partition).If the broker start a consumer rebalance. P0 could be assigned to another consumer C1. This consumer C1 should use the same transaction id than then previous C0 to prevent duplicate (Zombies fencing) ?
How do you achieve this in spring-kafka ? The transaction id seems to have nothing to do with the consumer and thus the partition read.
Thanks
You can't use a random TID because of the zombie fencing - if the server crashes you could have a partial transaction in the topic which will never be completed and nothing more will be consumed from any partitions with a write for that transaction.
That is by design - for the above reason.
Again, you can't randomize; for the above reason.
Cloud Foundry, for example, has an environment variable that indicates the instance index. If you are using a cloud platform that doesn't include something like that, you would have to simulate it somehow. Then, use it in the transaction id:
spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
ACLs - I can't answer that; I am not familiar with kafka permissions; might be better to ask a separate question for that.
I think we need to add some logic to Spring to ensure the same transaction id is always used for a particular topic/partition.
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
EDIT
Things have changed since this answer (KIP-447); if your brokers are 2.5.0 or later - see. https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-once and https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-once