Search code examples
pythonrabbitmqpika

Python - RabbitMQ Pika consumer - How to use async function as callback


I have the following code where I initialize a consumer listening to a queue.

consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
    exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)

The problem is that the consume method is defined as follows:

async def consume(self, channel, method, properties, body):

Inside the consume method, we need to await async functions, but this produces an error "coroutine is not awaited" for the consume function. Is there a way to use async function as a callback in pika?


Solution

  • I annotated my callback with @sync where sync is:

    def sync(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
        return wrapper
    

    (found it here for celery, but it worked with pika too)