Search code examples
pythonrabbitmqpikapython-pika

Multiple consumer in rabbitmq for multiple queue


I have 2 queues, say q1 and q2, which corresponds to e1 and e2 exchanges with binding key b1 and b2. I want to run consumer functions in parallel, say c1 and c2 which will listen to q1 and q2 respectively. I tried the following way:

def c1():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
    channel = connection.channel()
    channel.exchange_declare(exchange='e1', durable='true',
                         type='topic')
    result = channel.queue_declare(durable='false', queue='q1')
    queue_name = result.method.queue
    binding_key = "b1"
    channel.queue_bind(exchange='e1',
                       queue=queue_name,
                       routing_key=binding_key)
    channel.basic_consume(callback,queue=queue_name,no_ack=False)
    channel.start_consuming()

def c2():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
    channel = connection.channel()
    channel.exchange_declare(exchange='e2', durable='true',
                         type='topic')
    result = channel.queue_declare(durable='false', queue='q2')
    queue_name = result.method.queue
    binding_key = "b2"
    channel.queue_bind(exchange=e1,
                       queue=queue_name,
                       routing_key=binding_key)
    channel.basic_consume(callback,queue=queue_name,no_ack=False)
    channel.start_consuming()

if __name__ == '__main__':
    c1()
    c2()

However, it is only listening to c1 function and c2 function, it is not getting executed. How can I run the both functions? Thanks in advance.

EDIT: I have method c1 and c1 in 2 different module(file)


Solution

  • In order to run both functions simultaneously some multi threading method needs to be in order. Please have a look here for some python examples.

    Here is your code modified with the Process class. It can also use thread or run it explicitly from the OS.

    import pika
    from multiprocessing import Process
    
    
    def callback():
        print 'callback got data'
    
    
    class c1():
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange='e1', durable='true', type='topic')
            result = self.channel.queue_declare(durable='false', queue='q1')
            queue_name = result.method.queue
            binding_key = "b1"
            self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
            self.channel.basic_consume(callback,queue=queue_name,no_ack=False)
    
        def run(self):
            self.channel.start_consuming()
    
    
    class c2():
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange='e2', durable='true', type='topic')
            result = self.channel.queue_declare(durable='false', queue='q2')
            queue_name = result.method.queue
            binding_key = "b2"
            self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
    
            self.channel.basic_consume(callback,queue=queue_name,no_ack=False)
    
        def run(self):
            self.channel.start_consuming()
    
    if __name__ == '__main__':
        subscriber_list = []
        subscriber_list.append(c1())
        subscriber_list.append(c2())
    
        # execute
        process_list = []
        for sub in subscriber_list:
            process = Process(target=sub.run)
            process.start()
            process_list.append(process)
    
        # wait for all process to finish
        for process in process_list:
            process.join()