Search code examples
apache-kafkaconsumerconfluent-cloud

How to consume data from a Apache Kafka


Participating in a chalenge, it says like: Your first step - consume data sample from a Apache Kafka. So they give me topic name, API_KEY and API_SECRET. Oh, and bootstrap server. Then they claim as if you unfamiliar with Kafka, there is comprehensive documentation provided by Confluent. So ok, sign in to confluent, make a cluster and.. what is the next step to consume data ?


Solution

  • Here's a basic pattern for putting messages from Kafka into a list in Python.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        'someTopicName',
         bootstrap_servers=['192.168.1.160:9092'],
         auto_offset_reset='earliest',
         enable_auto_commit=True,
         group_id='my-group',
         value_deserializer=lambda x: loads(x.decode('utf-8')))
    print("We have a consumer instantiated")
    print(consumer)
    
    messageCache = []
    
    for message in consumer:
        messageCache.append(message.value)
    

    In this case, my Kafka broker is on my private LAN, using the default port, so my bootstrap servers list is just ["192.168.1.160:9092"].

    You can use standard counters and if statements to save the list to file or whatever, since the Kafka stream is assumed to run forever. For example, I have a process that consumes Kafka messages and saves them as a dataframe in parquet to HDFS for every 1,000,000 messages. In this case I was wanting to save historical messages to develop an ML model. The great thing about Kafka is that I could write another process that evaluated and potentially responded to every message in real time.