I got the following concept, which is a simplified example of a real app. The problem is that the producer
will not produce messages while working in a multiprocessing.Process
process.
import logging
import multiprocessing
import os
from kafka import KafkaProducer
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')
producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
def test_produce():
# producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
for i in range(10):
print('inserting') # prints once
producer.send(KAFKA_NAME, b'test').get() # will not be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()
The problem lies in process-across object serialization, which causes the producer to be serialized whatever-wrong. The solution is to create local producer instances inside your multiprocessed function
import logging
import multiprocessing
import os
from kafka import KafkaProducer
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')
producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
def test_produce():
producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
for i in range(10):
print('inserting') # prints 10 times
producer1.send(KAFKA_NAME, b'test').get() #all messages will be commited
producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()
This way everything works as it is supposed to be. By the way, you can use threading.Thread
, which will not cause such an issue.