Participating in a chalenge, it says like: Your first step - consume data sample from a Apache Kafka. So they give me topic name, API_KEY and API_SECRET. Oh, and bootstrap server. Then they claim as if you unfamiliar with Kafka, there is comprehensive documentation provided by Confluent. So ok, sign in to confluent, make a cluster and.. what is the next step to consume data ?
Here's a basic pattern for putting messages from Kafka into a list in Python.
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'someTopicName',
bootstrap_servers=['192.168.1.160:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)
messageCache = []
for message in consumer:
messageCache.append(message.value)
In this case, my Kafka broker is on my private LAN, using the default port, so my bootstrap servers list is just ["192.168.1.160:9092"].
You can use standard counters and if statements to save the list to file or whatever, since the Kafka stream is assumed to run forever. For example, I have a process that consumes Kafka messages and saves them as a dataframe in parquet to HDFS for every 1,000,000 messages. In this case I was wanting to save historical messages to develop an ML model. The great thing about Kafka is that I could write another process that evaluated and potentially responded to every message in real time.