Search code examples
transactionsapache-kafkakafka-consumer-apikafka-producer-api

Kafka: Polling after producer transaction doesn't get the produced messages


I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.

Test case

When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:

My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.

Questions

  1. Am I something missing so that my transaction result from the last round is not visible to the consumer of the next round?
  2. Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition are read?
  3. Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?

Configuration

  • Transactional consumer

    final Map consumerConfig = new LinkedHashMap<>(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  • Transactional producer

    final Map producerConfig = new LinkedHashMap<>(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

  • My poll timeout is 2sec

  • My understanding is that transactional producers are automatically idempotent and acks=all
  • My testcase is with only one broker and one replication. But of course I intend to use more in production
  • I use Kafka 2.0
  • My topic only has one partition
  • My thread has its own consumer group and is assigned to this single partition

Solution

  • For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.

    Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.

    For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.

    Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.