Search code examples
apache-kafkatransactionskafka-consumer-apikafka-producer-api

Kafka transactional producer writes messages even if producer commit in not invoked


For me it seems like kafka transactional producer is behaving like a regular producer, the meesages are visible on the topic as send is called for each message. Maybe I am missing something basic. I was expecting the messages to appear in the topic only after the producer commit method is called. In my code below produce.commitTransactions() is commented out but I still get the messages in the topic. Thanks for any pointers.

  public static void main(String[] args) {
        try {
            Properties producerConfig = new Properties();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
            producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
            producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<>(producerConfig);

            producer.initTransactions(); //initiate transactions
            try {
                producer.beginTransaction(); //begin transactions
                for (Integer i = 0; i < 1000; i++) {
                    producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));

                }
                // producer.commitTransaction(); //commit

            } catch (KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }

            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }

Solution

  • When it comes to transactions in Kafka you need to consider a Producer/Consumer pair. A Producer itself, as you have observed, is just producing data and either committing the transaction or not.

    Only in interplay with a consumer you can "complete" a transaction by setting the KafkaConsumer configuration isolation.level set to read_committed (by default it is set to read_uncommitted). This configuration is described as:

    isolation.level: Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return transactional messages which have been committed. If set to read_uncommitted' (the default), consumer.poll() will return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned unconditionally in either mode.