Search code examples
pythonapache-kafkaconfluent-kafka-python

Read Data from Kafka Broker at Specific Time Intervals (Python)


I have a system where a machine is reading data and continuously appends a .txt file. This dataset is read into a Kafka broker via Kafka Connect and would then be pre-processed with some Python code. The machine operates roughly every 5 mins, so we would expect data to come in, and then be idle for 5 mins. until the next batch. The Kafka setup is fine, so please assume everything upstream of this code is working properly.

from confluent_kafka import Consumer
import json

KAFKA_BROKER_URL = 'localhost:9092'
live_data = []

def parse_poll_message(msg):
    row = json.loads(msg)
    split_msg = list(row['payload'].split('\t'))
    return split_msg

consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER_URL,
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
})


consumer.subscribe(['my_topic'])

while 1:
    msg = consumer.poll()
    if msg is None:
        break
    elif msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    else:
        live_data.append(parse_poll_message(msg.value().decode('utf-8')))

consumer.close()

The code above just demonstrates what I would do at one point in time. What I would like to do is every 5 minutes, collect all the messages at that time, convert them into a dataframe, perform some calculations, and then wait for the next set of messages. How do I keep this loop active while retaining messages in the correct time interval?

Any and all suggestions are appreciated. Thank you!


Solution

  • Have you tried passing a time interval to the poll function?

    while 1:
        msg = consumer.poll(300000) # listen for 5 minutes of data
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        else:
            live_data.append(parse_poll_message(msg.value().decode('utf-8')))
    

    This will leave the consumer wait for the messages in the 5 minutes timeframe and then do your processing