Search code examples
pythonapache-kafkaconfluent-kafka-python

Consume from kafka without infinite loop


I'm currently using the Confluent kafka python client to consume messages from a kafka topic and the code runs fine inside of a while True loop as shown in examples in the documentation. However I would like to set up a cron job that only consumes from the topic once a day. The idea is the job will check the topic in the morning, consume all the messages in the topic at that point in time and then stop. I tried achieving this in python like this:

msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()

The problem with this is it never ends up consuming anything. I guess the first line never gets a message on the first try. It only works with while True but I don't want this code to run infinitely, just until the last message at that point in time has been consumed.


Solution

  • You could check the offsets of the consumer group within the loop, then break the loop once you are within some threshold of the "end"

    You might also want to play around with the max.poll.records consumer configuration to give more control over how many records you get back