Search code examples
pythonrabbitmqpika

Rabbitmq: channel.start_consuming() wont consume


I am working with rabbitmq. Code is stuck at channel.start_consuming(). There are messages in queue.

What could be the problem. When I forcefully end the code using ctrl+C :

INFO:pika.adapters.base_connection:Connecting fd 4 to localhost:5672
INFO:pika.adapters.blocking_connection:Adapter connected
 [*] Waiting for messages. To exit press CTRL+C



^CTraceback (most recent call last):
  File "recipe_code.py", line 293, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 722, in start_consuming
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
    super(BlockingConnection, self)._handle_read()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 296, in _handle_read
    data = self.socket.recv(self._buffer_size)
KeyboardInterrupt

recipe_code.py is a worker:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='myqueue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):

//do some work//

  ch.basic_ack(delivery_tag = method.delivery_tag)

  channel.basic_qos(prefetch_count=1)
  channel.basic_consume(callback,queue='myqueue')

channel.start_consuming()

Solution

  • The only issue that I see with your code is the indent on basic_qos and basic_consume. If the code you posted is correct, those two functions would never called.

    connection = \
        pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    
    def callback(ch, method, properties, body):
        print "Message:", body
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.queue_declare(queue='myqueue', durable=True)
    # You had an unwanted indent here.
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='myqueue')
    
    print ' [*] Waiting for messages. To exit press CTRL+C'
    channel.start_consuming()
    

    The printed message you have should also be just above the start_consuming line, as that is when pika will actually start listening for messages to consume.