Search code examples
pythonrabbitmqpython-multiprocessingpika

How do I set up a RabbitMQ consumer to consume from a nonempty queue?


I'm currently working with RabbitMQ in Python using the Pika client to create a server that handles various message types. The basic setup I have is one queue receiving all incoming messages, a routing process that directs them to the correct destinations, and several processes to handle requests and accept incoming data. This setup has been working fine, except in one specific case. When I have the RabbitMQ server running before the server processes are started and it gets a message, it correctly stores those in the incoming message queue. However, when I then try to start those processes and set up a consumer to that non-empty incoming queue with the pika.basic_consume function, the program hangs. So, at the moment if I want to start up my server processes, I have to purge all messages from the queues before it will work correctly. How do I fix this to work with nonempty queues?


Here's a sample of one of the processes, they all are set up essentially the same as this one.

class Router(Process):

    def __init__(self,routing_table):
        super(Router,self).__init__()
        self.routing_table = routing_table

        self.routeQueues    = {
            'r' : 'registration',
            't' : 'util',
            'p' : 'util',
            's' : 'data'
        }

        # Create a connection to the RabbitMQ server.
        self.rabbitConn =  pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.rabbitConn.channel()

        # Load all of the existing registered node queues
        with open('registrations/nodes.txt','r') as nodes:
            for line in nodes:
                info = line.strip().split(":")
                self.channel.queue_declare(info[1])

        # Declare the default queues
        queue_list = ["incoming","registration","util"]
        for queueName in queue_list:
            self.channel.queue_declare(queueName)

        # Start consuming things from the incoming queue
        self.channel.basic_consume(self.gotPacket,queue='incoming')

    def gotPacket(self,ch,method,params,body):
        # Does stuff. Not relevant here.
        pass

   def run(self):
       self.channel.start_consuming()

Solution

  • This issue was caused by the pika 0.9.13 library. Upgrading to pika 0.9.14 resolves this issue. @eandersson