Search code examples
pythonapache-kafkaapache-zookeeper

Kafka broker does not receive message from python producer


Hello i use the following configuration for Kafka using docker compose

compose_kafka.yml

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.1.10
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka_manager:
    image: kafkamanager/kafka-manager
    container_name: kafka-manager
    restart: always
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "192.168.1.10:2181"
      APPLICATION_SECRET: "random-secret"

I create a producer which produces messages to the kafka server

Generate.py

from faker import Faker

fake = Faker()

class Registered_user:

    def get_registered_user():
        return {
            "name": fake.name(),
            "address": fake.address(),
            "created_at": fake.year()
        }

Producer_registered_user.py

import time
import json
from kafka import KafkaProducer
from fake_data import Generate

  
def json_serializer(data):
    return json.dumps(data).encode("utf-8")


producer = KafkaProducer(bootstrap_servers='192.168.1.10:9092',
                         value_serializer=json_serializer)


if __name__ == '__main__':
    while 1 == 1:
        user = Generate.Registered_user.get_registered_user()
        producer.send('registered_user', user)
        print(user)
        time.sleep(4)

enter image description here

But the consumer does not receive any messages:

enter image description here

Consumer_registered_user.py

import json
from kafka import KafkaConsumer

if __name__ == '__main__':
    consumer = KafkaConsumer(
        bootstrap_servers='192.168.1.10:9092',
        auto_offset_reset="from-beginning",
        group_id="consumer-group-a"
    )

    for message in consumer:
        print("User = {}".format(json.loads(message.value)))

I also checked if the topic is listed under the topics:

enter image description here

and if kafka received the messages:

enter image description here

Could you please help me with this problem?


Solution

  • Check your consumer application configuration,

    Sample:

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        'topic_1', 'topic_2',
        bootstrap_servers='192.168.1.10:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my-group'
    )
    
    #Make sure the consumer is successfully subscribing to all required topics
    print(consumer.subscription()) 
    
    for message in consumer:
        print(f"Received from {message.topic}: {message.value.decode('utf-8')}")
    

    Add topic to your consumer:

    Consumer_registered_user.py

    import json
    from kafka import KafkaConsumer
    
    if __name__ == '__main__':
        consumer = KafkaConsumer(
            'registered_user',  # Specify the topic you want to consume
            bootstrap_servers='192.168.1.10:9092',
            auto_offset_reset="earliest",  # Correct value
            group_id="consumer-group-a"
        )
    
        for message in consumer:
            print("User = {}".format(json.loads(message.value.decode('utf-8'))))
    

    Issues:

    1. Missing Topic Subscription: You need to pass the topic name ('registered_user') as the first argument to KafkaConsumer to ensure your consumer subscribes to that topic.

    2. Setting auto_offset_reset="earliest" ensures that if there are no committed offsets, the consumer will start reading from the earliest available message in the topic.

    Ref - https://docs.confluent.io/kafka-clients/python/current/overview.html