Search code examples
pythonapache-kafkakafka-consumer-apikafka-producer-api

Kafka Consumer not receiving messages from a topic- how to debug?


Producer.py

from binance.websocket.spot.websocket_client import SpotWebsocketClient
from binance.spot import Spot as Client
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
import multiprocessing
import logging
import json
import time
import os


def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=json_serializer)

base_url = 'https://api.binance.com'
stream_url = 'wss://stream.binance.com:9443/ws'
client = Client(base_url=base_url)
ws_client = SpotWebsocketClient(stream_url=stream_url)
order_book = {
    "lastUpdateId": 0,
    "bids": [],
    "asks": []
}


def orderbook_message_handler(symbol,message): 
    
    # print(symbol,symbols_dict[symbol])
    result = producer.send("binance-orderbook",message,partition=symbols_dict[symbol])
    print("result: ",result)

def listen_binance_orderbook(symbol):
    ws_client.start()
    symbol = symbol.lower()
    ws_client.diff_book_depth(
        symbol=symbol.lower(),
        id=1,
        speed=1000,
        callback=lambda message: orderbook_message_handler(symbol, message)
    )

def load_symbols():
    with open('binance_config.json') as f:
        data = json.load(f)
    symbol_dict = {symbol: number for symbol, number in data.items()}
    return symbol_dict

symbols_dict = load_symbols()

def main():

    global symbols_dict
    try:
        admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

        topic = NewTopic(name='binance-orderbook',
                            num_partitions=len(symbols_dict),
                            replication_factor=1)
        admin.create_topics([topic])
    except Exception:
        pass

    processes = []
    for key, value in symbols_dict.items():
        # multiprocessing helps to bypass GIL 
        p = multiprocessing.Process(target=listen_binance_orderbook, args=(key,))
        processes.append(p)

    # start all processes
    for p in processes:
        p.start()

    # wait for all processes to complete
    for p in processes:
        p.join()


main()

Consumer.py

from kafka import KafkaAdminClient, KafkaConsumer
from kafka import TopicPartition
import json

consumer = KafkaConsumer(              
    bootstrap_servers=['localhost:9092'],
    group_id="strategy-two",           
    auto_offset_reset='earliest',  

)

# assign to a specific partition of the topic
partition = TopicPartition('binance-orderbook', 0)
consumer.assign([partition])

# seek to the beginning of the partition
consumer.seek_to_beginning(partition)

for msg in consumer:
    print("inside")
    print("binance orderbook = {}".format(json.loads(msg.value)))


I am trying to send messages to a Kafka topic using a Python producer, but it seems like the messages are not being sent. I have a running Kafka broker(localhost:9092) and zookeeper on localhost, and I am using the kafka-python library to send the messages.

I have checked that my producer code is running without any errors, and I can see the output "result: <kafka.producer.future.FutureRecordMetadata object at 0x7f37553c5a00>" when I call producer.send(). However, my consumer code does not seem to be receiving any messages from the Kafka topic.

I have checked that my consumer code is running without any errors, and I have assigned it to the correct topic and partition. I have also set the auto_offset_reset to 'earliest' to make sure that it starts from the beginning of the topic. When the Consumer.py is running its not printing "inside", which means it is not receiving any messages.

How can I debug this issue? What are some steps that I can take to check if the messages are actually being sent to Kafka and if my consumer code is reading from the correct topic and partition?

It seems that consumer is idle not recieving any messages. I ran the producer first and then the consumer

Any help would be appreciated. Thank you!


Solution

  • producer code is running without any errors, and I can see the output "result: <kafka.producer.future.FutureRecordMetadata object at 0x7f37553c5a00>"

    This is an async Future. producer.send() does not actually make a network request to the broker - this happens in the background, at some later point, but only if the producer flushes its buffer.

    So, you either need to call producer.flush() or result.get() to get any reasonable output.

    You can further debug with kafka-consumer-groups --describe and kafka-run-class kafka.tools.GetOffsetShell to see if there are offsets in the topic to actually consume.