Search code examples
pythonrabbitmqgeneratorpikaconsumer

Is it possible to use RabbitMQ direct reply-to feature with a Pika generator consumer in Python?


I would like to use the direct reply-to feature of RabbitMQ with the Pika client library in Python. It works with a basic consumer. But it raises the following exception with a generator consumer:

pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - fast reply consumer does not exist')

Is there a way to use the direct reply-to feature with a generator consumer?

Sample client code using a basic consumer (it works):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.start_consuming()

Sample client code using a generator consumer (it raises the exception):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)

Environment. — Windows 10, RabbitMQ 3.7.13, CPython 3.7.3, Pika 1.0.1.

Note. — Calling the basic_consume method after the basic_publish method in the sample client code using a basic consumer raises the same exception as when using a generator consumer:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.start_consuming()

Solution

  • As suggested by Luke Bakken here, this does the trick:

    import pika
    
    
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
    
    
    connection = pika.BlockingConnection()
    channel = connection.channel()
    
    with connection, channel:
        message = "hello"
        next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
                             inactivity_timeout=0.1))
        channel.basic_publish(
            exchange="", routing_key="test", body=message.encode(),
            properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
        print("sent:", message)
    
        for (method, properties, body) in channel.consume(
                queue="amq.rabbitmq.reply-to", auto_ack=True):
            handle(channel, method, properties, body)