I would like to use the direct reply-to feature of RabbitMQ with the Pika client library in Python. It works with a basic consumer. But it raises the following exception with a generator consumer:
pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - fast reply consumer does not exist')
Is there a way to use the direct reply-to feature with a generator consumer?
Sample client code using a basic consumer (it works):
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_consume(queue="amq.rabbitmq.reply-to",
on_message_callback=handle, auto_ack=True)
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
channel.start_consuming()
Sample client code using a generator consumer (it raises the exception):
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
for (method, properties, body) in channel.consume(
queue="amq.rabbitmq.reply-to", auto_ack=True):
handle(channel, method, properties, body)
Environment. — Windows 10, RabbitMQ 3.7.13, CPython 3.7.3, Pika 1.0.1.
Note. — Calling the basic_consume
method after the basic_publish
method in the sample client code using a basic consumer raises the same exception as when using a generator consumer:
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
channel.basic_consume(queue="amq.rabbitmq.reply-to",
on_message_callback=handle, auto_ack=True)
channel.start_consuming()
As suggested by Luke Bakken here, this does the trick:
import pika
def handle(channel, method, properties, body):
message = body.decode()
print("received:", message)
connection = pika.BlockingConnection()
channel = connection.channel()
with connection, channel:
message = "hello"
next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
inactivity_timeout=0.1))
channel.basic_publish(
exchange="", routing_key="test", body=message.encode(),
properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
print("sent:", message)
for (method, properties, body) in channel.consume(
queue="amq.rabbitmq.reply-to", auto_ack=True):
handle(channel, method, properties, body)