Search code examples
pythonapache-kafkaconsumerkafka-python

get results from kafka for a specific period of time


Here is my code, that uses kafka-python.

now = datetime.now()
month_ago = now - relativedelta(month=1)
topic = 'some_topic_name'
consumer = KafkaConsumer(topic, bootstrap_servers=PROD_KAFKA_SERVER,
                         security_protocol=PROTOCOL,
                         group_id=GROUP_ID,
                         enable_auto_commit=False,
                         sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
                         sasl_plain_password=SASL_PASSWORD)


for msg in consumer:
    print(msg)

I want to get results from topic just between now and month_ago in a loop. How can I do this?

Thanks for any help!


Solution

  • Finally, I do this :) My code looks like this:

    topic = 'some_topic_name'
    consumer = KafkaConsumer(bootstrap_servers=PROD_KAFKA_SERVER,
                             security_protocol=PROTOCOL,
                             group_id=GROUP_ID,
                             sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
                             sasl_plain_password=SASL_PASSWORD)
    
    month_ago = (datetime.now() - relativedelta(months=1)).timestamp()
    topic_partition = TopicPartition(topic, 0)
    assigned_topic = [topic_partition]
    consumer.assign(assigned_topic)
    
    partitions = consumer.assignment()
    partition_to_timestamp = {part: int(month_ago * 1000) for part in partitions}
    end_offsets = consumer.end_offsets(list(partition_to_timestamp.keys()))
    
    mapping = consumer.offsets_for_times(partition_to_timestamp)
    for partition, ts in mapping.items():
        end_offset = end_offsets.get(partition)
        consumer.seek(partition, ts[0])
        for msg in consumer:
            value = json.loads(msg.value.decode('utf-8'))
            # do something
            if msg.offset == end_offset - 1:
                consumer.close()
                break