Search code examples
pythonapache-kafkakafka-python

KafkaProducer does not send messages in multiprocessing.Process


I got the following concept, which is a simplified example of a real app. The problem is that the producer will not produce messages while working in a multiprocessing.Process process.

import logging
import multiprocessing
import os

from kafka import KafkaProducer

KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')

producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')


def test_produce():
    # producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
    for i in range(10):
        print('inserting') # prints once
        producer.send(KAFKA_NAME, b'test').get() # will not be commited in the topic


producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()

Solution

  • The problem lies in process-across object serialization, which causes the producer to be serialized whatever-wrong. The solution is to create local producer instances inside your multiprocessed function

    import logging
    import multiprocessing
    import os
    
    from kafka import KafkaProducer
    
    KAFKA_HOST = os.getenv('KAFKA_HOST')
    KAFKA_PORT = os.getenv('KAFKA_PORT')
    KAFKA_NAME = os.getenv('KAFKA_NAME')
    
    producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
    
    
    def test_produce():
        producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
        for i in range(10):
            print('inserting') # prints 10 times
            producer1.send(KAFKA_NAME, b'test').get() #all messages will be commited
    
    
    producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
    producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
    multiprocessing.Process(target=test_produce).start()
    

    This way everything works as it is supposed to be. By the way, you can use threading.Thread, which will not cause such an issue.

    https://github.com/dpkp/kafka-python/issues/1416