Search code examples
apache-kafkaspring-kafkaexactly-once

Exactly once semantic with spring kafka


Im trying to test my exactly once configuration to make sure all the configs i set are correct and the behavior is as i expect

I seem to encounter a problem with duplicate sends

    public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();
    MessageConsumer consumer = new ConsumerBuilder()
            .setBootstrapServers("kafka:9992")
            .setIsolationLevel("read_committed")
            .setTopics("someTopic2")
            .setGroupId("bla")
            .setKeyDeserializerClass(StringDeserializer.class)
            .setValueDeserializerClass(MapDeserializer.class)
            .setConsumerMessageLogic(new ConsumerMessageLogic() {
                @Override
                public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                    producer.sendMessage(new TopicPartition("someTopic2", cr.partition()),
                            new OffsetAndMetadata(cr.offset() + 1),"something1", "im in transaction", cr.key());
                    acknowledgment.acknowledge();
                }
            }).build();
    consumer.start();
}

this is my "test", you can assume the builder puts the right configuration.

ConsumerMessageLogic is a class that handles the "process" part of the read-process-write that the exactly once semantic is supporting

inside the producer class i have a send message method like so:

    public void sendMessage(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata,String sendToTopic, V message, PK partitionKey) {
    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicPartition.topic(), partitionKey, message);
        if(kafkaTemplate.getProducerFactory().transactionCapable()){
            kafkaTemplate.executeInTransaction(operations -> {
                sendMessage(message, partitionKey, sendToTopic, partitionAndMessagePair, operations);
                operations.sendOffsetsToTransaction(
                        Map.of(topicPartition, offsetAndMetadata),"bla");
                return true;
            });

        }else{
            sendMessage(message, partitionKey, topicPartition.topic(), partitionAndMessagePair, kafkaTemplate);
        }
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

I create my consumer like so:

    /**
 * Start the message consumer
 * The record event will be delegate on the onMessage()
 */
public void start() {
    initConsumerMessageListenerContainer();
    container.start();
}

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer() {
    // start a acknowledge message listener to allow the manual commit
    messageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(messageListener);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener));
}

when i create my producer i create it with UUID as transaction prefix like so

public ProducerFactory<PK, V> producerFactory(boolean isTransactional) {
    ProducerFactory<PK, V> res = new DefaultKafkaProducerFactory<>(props);
    if(isTransactional){
        ((DefaultKafkaProducerFactory<PK, V>) res).setTransactionIdPrefix(UUID.randomUUID().toString());
        ((DefaultKafkaProducerFactory<PK, V>) res).setProducerPerConsumerPartition(true);
    }
    return res;
}

Now after everything is set up, i bring 2 instances up on a topic with 2 partitions each instance get 1 partitions from the consumed topic.

i send a message and wait in debug for the transaction timeout ( to simulate loss of connection) in instance A, once the timeout passes the other instance( instance B) automatically processes the record and send it to the target topic cause a re-balance occurred

So far so good. Now when i release the break point on instance A, it says its re-balancing and couldn't commit, but i still see another output record in my destination topic.

My expectation was that instance A wont continue its work once i release the breakpoint as the record was already processed.

Am i doing something wrong? Can this scenario be achieved?

edit 2:

after garys remarks about the execute in transaction, i get the duplicate record if i freeze one of the instances till the timeout and release it after the other instance processed the record, then the freezed instance process and produce the same record to the out put topic...

 public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();


        MessageConsumer consumer = new ConsumerBuilder()
                .setBootstrapServers("kafka:9992")
                .setIsolationLevel("read_committed")
                .setTopics("someTopic2")
                .setGroupId("bla")
                .setKeyDeserializerClass(StringDeserializer.class)
                .setValueDeserializerClass(MapDeserializer.class)
                .setConsumerMessageLogic(new ConsumerMessageLogic() {
                    @Override
                    public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                        producer.sendMessage("something1", "im in transaction");
                    }
                }).build();
        consumer.start(producer.getProducerFactory());
}

the new sendMessage method in the producer without executeInTransaction

public void sendMessage(V message, PK partitionKey, String topicName) {

    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicName, partitionKey, message);
        sendMessage(message, partitionKey, topicName, partitionAndMessagePair, kafkaTemplate);
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

as well as i changed the consumer container creation to have a transaction manager with the same producerfactory as suggested

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer(ProducerFactory<PK,V> producerFactory) {
    // start a acknowledge message listener to allow the manual commit
    acknowledgingMessageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(acknowledgingMessageListener, producerFactory);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener, producerFactory));
}

 @NonNull
private ContainerProperties containerProperties(MessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    ContainerProperties containerProperties = new ContainerProperties(topics);
    containerProperties.setMessageListener(messageListener);
    containerProperties.setTransactionManager(new KafkaTransactionManager<>(producerFactory));
    return containerProperties;
}

my expectation is that the broker once receiving the processed record from the freezed instance, that it'll know that that record was already handled by another instance as it contains the exact same metadata ( or is it? i mean, the PID will be different, but should it be different?)

Maybe the scenario im looking for is not even supported in the current exactly once support kafka and spring provides...

if i have 2 instances of read-process-write - that means i have 2 producers with 2 different PID's.

Now when i freeze one of the instances, when the unfrozen instance gets the record process responsibility due to a rebalance, it will send the record with its own PID and a sequence in the metadata.

Now when i release the frozen instance, he sends the same record but with its own PID, so theres no way the broker will know its a duplicate...

Am i wrong? how can i avoid this scenario? i though the re-balance stops the instance and doesnt let it complete its process ( where he produce the duplicate record) cause he no longer has responsibility about that record

Adding the logs: frozen instance: you can see the freeze time at 10:53:34 and i released it at 10:54:02 ( rebalance time is 10 secs)

2020-06-16 10:53:34,393 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]
2020-06-16 10:53:34,394 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
beginTransaction()
2020-06-16 10:53:34,395 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]]
2020-06-16 10:54:02,157 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1,      
groupId=bla] Group coordinator X.X.X.X:9992 (id: 2147482646 rack: 
null) is unavailable or invalid, will attempt rediscovery
2020-06-16 10:54:02,181 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:54:02,189 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[252] to topic something1
2020-06-16 10:54:02,193 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0, transactionalId=b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:54:02,263 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1, 
groupId=bla] Discovered group coordinator 192.168.144.1:9992 (id: 
2147482646 rack: null)
2020-06-16 10:54:02,295 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:54:02,296 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
commitTransaction()
2020-06-16 10:54:02,299 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:54:02,301 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.AbstractCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Attempt to heartbeat failed for 
since member id consumer-bla-1-b3ad1c09-ad06-4bc4-a891-47a2288a830f is 
not valid.
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Giving away all assigned 
partitions as lost since generation has been reset,indicating that 
consumer is no longer part of the group
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Lost previously assigned 
partitions someTopic2-0
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions lost: [someTopic2-0]
2020-06-16 10:54:02,303 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions revoked: [someTopic2-0]
2020-06-16 10:54:02,303 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}

The regular instance that takes over the partation and produce the record after a rebalance

2020-06-16 10:53:46,536 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]
2020-06-16 10:53:46,537 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
beginTransaction()
2020-06-16 10:53:46,539 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]]
2020-06-16 10:53:46,556 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:53:46,563 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[250] to topic something1
2020-06-16 10:53:46,566 INFO  [${sys:spring.application.name}] [kafka-        
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0, transactionalId=1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:53:46,668 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:53:46,669 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
commitTransaction()
2020-06-16 10:53:46,672 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:53:51,673 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Received: 0 records

I noticed they both note the exact same offset to commit

Sending offsets to transaction: {someTopic2-0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}

i thought when they try to commit the exact same thing the broker will abort one of the transactions...

I also noticed that if i reduce the transaction.timeout.ms to just 2 seconds, it doesnt abort the transaction no matter how long i freeze the instance on debug...

maybe the timer of transaction.timeout.ms starts only after i send the message?


Solution

  • You must not use executeInTransaction at all - see its Javadocs; it is used when there is no active transaction or if you explicitly don't want an operation to participate in an existing transaction.

    You need to add a KafkaTransactionManager to the listener container; it must have a reference to same ProducerFactory as the template.

    Then, the container will start the transaction and, if successful, send the offset to the transaction.