Search code examples
pythonapache-kafkaconfluent-kafka-python

How to consume messages in last N days using confluent-kafka-python?


This question is similar to Python KafkaConsumer start consuming messages from a timestamp except I want to know how to do it in the official Python Kafka client by Confluent.

I looked into the Consumer.offsets_for_times function but I'm confused by that it accepts timestamps in the TopicPartition.offset field.

How is a offset equivalent to a timestamp?


Solution

  • I did this recently for $work. You need to get the result of offsets_for_times(), then assign() that list to your consumer, and then call consume(). Importantly, don't subscribe() to the topic. (See Eden Hill's comment on https://github.com/confluentinc/confluent-kafka-python/issues/373).

    You're right that the documentation for this function is somewhat confusing when it comes to defining timestamps vs offsets.

    Update to answer followup question:

    The difference to How do I get the the offset of last message of a Kafka topic using confluent-kafka-python? is that rather than

    topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
    

    you would do something like this:

    whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
    whenms = int(whents) * 1000   # to get milliseconds
    
    topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]