I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Transactional producer
final Map producerConfig = new LinkedHashMap<>(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
My poll timeout is 2sec
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.