Search code examples
python-3.xcelerypython-asynciocelery-taskaioredis

How to use asyncio and aioredis lock inside celery tasks?


Goal:

  1. Possibility to run asyncio coroutines.
  2. Correct celery behavior on exceptions and task retries.
  3. Possibility to use aioredis lock.

So, how to run async tasks properly to achieve the goal?

What is RuntimeError: await wasn't used with future (below), how can I fix it?


I have already tried:

1. asgiref

async_to_sync (from asgiref https://pypi.org/project/asgiref/).

This option makes it possible to run asyncio coroutines, but retries functionality doesn't work.

2. celery-pool-asyncio

(https://pypi.org/project/celery-pool-asyncio/)

Same problem as in asgiref. (This option makes it possible to run asyncio coroutines, but retries functionality doesn't work.)

3. write own async to sync decorator

I have performed try to create my own decorator like async_to_sync that runs coroutines threadsafe (asyncio.run_coroutine_threadsafe), but I have behavior as I described above.

4. asyncio module

Also I have try asyncio.run() or asyncio.get_event_loop().run_until_complete() (and self.retry(...)) inside celery task. This works well, tasks runs, retries works, but there is incorrect coroutine execution - inside async function I cannot use aioredis.

Implementation notes:

  • start celery command: celery -A celery_test.celery_app worker -l info -n worker1 -P gevent --concurrency=10 --without-gossip --without-mingle
  • celery app:
transport = f"redis://localhost/9"
celery_app = Celery("worker", broker=transport, backend=transport,
                    include=['tasks'])

celery_app.conf.broker_transport_options = {
    'visibility_timeout': 60 * 60 * 24,
    'fanout_prefix': True,
    'fanout_patterns': True
}
  • utils:
@contextmanager
def temp_asyncio_loop():
    # asyncio.get_event_loop() automatically creates event loop only for main thread
    try:
        prev_loop = asyncio.get_event_loop()
    except RuntimeError:
        prev_loop = None
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        yield loop
    finally:
        loop.stop()
        loop.close()
        del loop
        asyncio.set_event_loop(prev_loop)


def with_temp_asyncio_loop(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        with temp_asyncio_loop() as t_loop:
            return f(*args, loop=t_loop, **kwargs)

    return wrapper


def await_(coro):
    return asyncio.get_event_loop().run_until_complete(coro)
  • tasks:

@celery_app.task(bind=True, max_retries=30, default_retry_delay=0)
@with_temp_asyncio_loop
def debug(self, **kwargs):
    try:
        await_(debug_async())
    except Exception as exc:
        self.retry(exc=exc)


async def debug_async():
    async with RedisLock(f'redis_lock_{datetime.now()}'):
        pass
  • redis lock

class RedisLockException(Exception):
    pass


class RedisLock(AsyncContextManager):
    """
    Redis Lock class

    :param lock_id: string (unique key)
    :param value: dummy value
    :param expire: int (time in seconds that key will storing)
    :param expire_on_delete: int (time in seconds, set pause before deleting)

        Usage:
            try:
                with RedisLock('123_lock', 5 * 60):
                    # do something
            except RedisLockException:
    """

    def __init__(self, lock_id: str, value='1', expire: int = 4, expire_on_delete: int = None):
        self.lock_id = lock_id
        self.expire = expire
        self.value = value
        self.expire_on_delete = expire_on_delete

    async def acquire_lock(self):
        return await redis.setnx(self.lock_id, self.value)

    async def release_lock(self):
        if self.expire_on_delete is None:
            return await redis.delete(self.lock_id)
        else:
            await redis.expire(self.lock_id, self.expire_on_delete)

    async def __aenter__(self, *args, **kwargs):
        if not await self.acquire_lock():
            raise RedisLockException({
                'redis_lock': 'The process: {} still run, try again later'.format(await redis.get(self.lock_id))
            })
        await redis.expire(self.lock_id, self.expire)

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.release_lock()

On my windows machine await redis.setnx(...) blocks celery worker and it stops producing logs and Ctrl+C doesn't work.

Inside the docker container, I receive an error. There is part of traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 854, in read_response
    response = await self._parser.read_response()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 366, in read_response
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
aioredis.exceptions.ConnectionError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 54, in run
    ret = task.retry(exc=exc, **retry_kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/task.py", line 717, in retry
    raise_with_context(exc)
  File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 34, in run
    return task._orig_run(*args, **kwargs)
  File "/app/celery_tasks/tasks.py", line 69, in wrapper
    return f(*args, **kwargs) # <--- inside with_temp_asyncio_loop from utils
  ...
  File "/usr/local/lib/python3.9/contextlib.py", line 575, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/app/db/redis.py", line 50, in __aenter__
    if not await self.acquire_lock():
  File "/app/db/redis.py", line 41, in acquire_lock
    return await redis.setnx(self.lock_id, self.value)
  File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1064, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1080, in parse_response
    response = await connection.read_response()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 859, in read_response
    await self.disconnect()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 762, in disconnect
    await self._writer.wait_closed()
  File "/usr/local/lib/python3.9/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: await wasn't used with future
  • library versions:
celery==5.2.1
aioredis==2.0.0

Solution

  • Use solo pool, then create a decorator which run task function asyncio.get_event_loop().run_until_complete(f(*args, **kwargs)) and make your task asynchronous

    def sync(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
        return wrapper
    
    @celery_app.task()
    @sync
    async def task():
       ...