Search code examples
pythonmultithreadingrabbitmqmultiprocessingpika

Multiple consumer Rabbitmq through multiprocessing


I am trying to create multiple consumer for a RabbitMQ client. I am using PIKA and trying to do with multiprocessing. It seems connecting but not being able to sustain the loop. Can you please help. The part of the code should also take care the writer option through the call back.

it should start the loop and should consume always

import multiprocessing
import time
import pika
# this is the writer part
def callback(ch, method, properties, body):
    print (" [x] %r received %r" % (multiprocessing.current_process(), body,))
    time.sleep(body.count('.'))
    # print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume():
    credentials = pika.PlainCredentials(userid, password)
    parameters = pika.ConnectionParameters(url, port, '/', credentials)
    connection = pika.BlockingConnection(
        parameters=parameters)
    channel = connection.channel()
    channel.queue_declare(queue='queuename', durable=True)
    channel.basic_consume('queuename',callback)
    print (' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

userid = "user"
password = "pwd"
url = "localhost"
port = 5672

if __name__ == "__main__":

    workers = 5
    pool = multiprocessing.Pool(processes=workers)
    for i in range(0, workers):
        pool.apply_async(consume)

    #Stay alive
    try:
        while True:


Solution

  • You aren't doing any exception handling in your sub-processes, so my guess is that exceptions are being thrown that you don't expect. This code works fine in my environment, using Pika 1.1.0 and Python 3.7.3.

    Before I checked for exceptions in body.count() a TypeError would be thrown because body was not a str in that case.

    Please note that I'm using the correct method to wait for sub-processes, according to these docs.