I have a system where a machine is reading data and continuously appends a .txt file. This dataset is read into a Kafka broker via Kafka Connect and would then be pre-processed with some Python code. The machine operates roughly every 5 mins, so we would expect data to come in, and then be idle for 5 mins. until the next batch. The Kafka setup is fine, so please assume everything upstream of this code is working properly.
from confluent_kafka import Consumer
import json
KAFKA_BROKER_URL = 'localhost:9092'
live_data = []
def parse_poll_message(msg):
row = json.loads(msg)
split_msg = list(row['payload'].split('\t'))
return split_msg
consumer = Consumer({
'bootstrap.servers': KAFKA_BROKER_URL,
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
})
consumer.subscribe(['my_topic'])
while 1:
msg = consumer.poll()
if msg is None:
break
elif msg.error():
print("Consumer error: {}".format(msg.error()))
continue
else:
live_data.append(parse_poll_message(msg.value().decode('utf-8')))
consumer.close()
The code above just demonstrates what I would do at one point in time. What I would like to do is every 5 minutes, collect all the messages at that time, convert them into a dataframe, perform some calculations, and then wait for the next set of messages. How do I keep this loop active while retaining messages in the correct time interval?
Any and all suggestions are appreciated. Thank you!
Have you tried passing a time interval to the poll function?
while 1:
msg = consumer.poll(300000) # listen for 5 minutes of data
if msg is None:
break
elif msg.error():
print("Consumer error: {}".format(msg.error()))
continue
else:
live_data.append(parse_poll_message(msg.value().decode('utf-8')))
This will leave the consumer wait for the messages in the 5 minutes timeframe and then do your processing