I have the following code where I initialize a consumer listening to a queue.
consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)
The problem is that the consume method is defined as follows:
async def consume(self, channel, method, properties, body):
Inside the consume method, we need to await async functions, but this produces an error "coroutine is not awaited" for the consume function. Is there a way to use async function as a callback in pika?
I annotated my callback with @sync
where sync is:
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
(found it here for celery, but it worked with pika too)