Search code examples
pythonpython-2.7rabbitmqpika

Synchronous and blocking consumption in RabbitMQ using pika


I want to consume a queue (RabbitMQ) synchronously with blocking.

Note: below is full code ready to be run.

The system set up is using RabbitMQ as it's queuing system, but asynchronous consumption is not needed in one of our modules.

I've tried using basic_get on top of a BlockingConnection, which doesn't block (returns (None, None, None) immediately):

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

I've also tried to use the consume generator, fails with "Connection Closed" after a long time of not consuming.

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)

Is there a way to use RabbitMQ using the pika client as I would a Queue.Queue in python? or anything similar?

My option at the moment is busy-wait (using basic_get) - but I rather use the existing system to not busy-wait, if possible.

Full code:

#!/usr/bin/env python
import pika
import time

TEST_QUEUE = 'test'
def get_connection():
        # define connection
        connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                        host=YOUR_IP,
                        port=YOUR_PORT,
                        credentials=pika.PlainCredentials(
                                username=YOUR_USER,
                                password=YOUR_PASSWORD,
                        )
                )
        )
        return connection

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)


print "blocking_get_1"
blocking_get_1()

print "blocking_get_2"
blocking_get_2()

get_connection().channel().queue_delete(TEST_QUEUE)

Solution

  • A common problem with Pika is that it is currently not handling incoming events in the background. This basically means that in many scenarios you will need to call connection.process_data_events() periodically to ensure that it does not miss heartbeats.

    This also means that if you sleep for a extended period of time, pika will not be handling incoming data, and eventually die as it is not responding to heartbeats. An option here is to disable heartbeats.

    I usually solve this by having a thread in the background check for new events, as seen in this example.

    If you want to block completely I would do something like this (based on my own library AMQPStorm).

    while True:
        result = channel.basic.get(queue='simple_queue', no_ack=False)
        if result:
            print("Message:", message.body)
            message.ack()
        else:
            print("Channel Empty.")
            sleep(1)
    

    This is based on the example found here.