This question is similar to Python KafkaConsumer start consuming messages from a timestamp except I want to know how to do it in the official Python Kafka client by Confluent.
I looked into the Consumer.offsets_for_times function but I'm confused by that it accepts timestamps in the TopicPartition.offset
field.
How is a offset
equivalent to a timestamp?
I did this recently for $work. You need to get the result of offsets_for_times()
, then assign()
that list to your consumer, and then call consume()
. Importantly, don't subscribe()
to the topic. (See Eden Hill's comment on https://github.com/confluentinc/confluent-kafka-python/issues/373).
You're right that the documentation for this function is somewhat confusing when it comes to defining timestamps vs offsets.
Update to answer followup question:
The difference to How do I get the the offset of last message of a Kafka topic using confluent-kafka-python? is that rather than
topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
you would do something like this:
whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000 # to get milliseconds
topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]