Search code examples
pythonapache-kafkakafka-python

Unable to send messages to topic in Kafka Python


I have a producer code where am sending messages to Kafka. I was able to send messages till yesterday. From today I am unable to send messages. Not sure if it's version compatible issue. There are no failures or error messages, code gets executed, but it's not sending messages.

Below are the Python module versions:

kafka-python==2.0.1
Python 3.8.2

Below is my code:

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')

I tries logging the behavior as well, but no idea why producer gets closed:

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed

Process finished with exit code 0

Solution

  • Adding producer.flush() at the end helped me to resolve the issues. Any outstanding messages will be flushed (delivered) before actually committing the transaction

    from kafka import KafkaProducer
    import logging
    logging.basicConfig(level=logging.INFO)
    
    producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
    producer.send('Jim_Topic', b'Message from PyCharm')
    producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
    producer.flush()