Search code examples
pythonredispython-asyncioaioredis

asyncio python coroutine cancelled while task still pending reading from redis channel


I have multiple couroutines each of which waits for content in a queue to start processing.

The content for the queues is populated by channel subscribers whose job is only to receive messages a push an item in the appropriate queue.

After the data is consumed by one queue processor and new data is generated it's dispatched to the appropriate message channel where this process is repeated until the data is ready to be relayed to an api that provisions it.

import asyncio
from random import randint
from Models.ConsumerStrategies import Strategy
from Helpers.Log import Log
import Connectors.datastore as ds

import json
__name__ = "Consumer"

MIN = 1
MAX = 4

async def consume(configuration: dict, queue: str, processor: Strategy) -> None:
    """Consumes new items in queue and publish a message into the appropriate channel with the data generated for the next consumer,
    if no new content is available sleep for a random number of seconds between MIN and MAX global variables

    Args:
        configuration (dict): configuration dictionary
        queue (str): queue being consumed
        processor (Strategy): consumer strategy
    """
    
    logger = Log().get_logger(processor.__name__, configuration['logFolder'], configuration['logFormat'], configuration['USE'])
    while True:
        try:
            ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
            token = await ds_handle.lpop(queue)
            if token is not None:
                result = await processor.consume(json.loads(token), ds_handle)
                status = await processor.relay(result, ds_handle)
                logger.debug(status)
            else:
                wait_for = randint(MIN,MAX)
                logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
                await asyncio.sleep(wait_for)
            ds_handle.close()
        except Exception as e:
            logger.error(f"{e}")
            logger.error(f"{e.with_traceback}")

What I'm noticing is that after a 24h run I'm getting these errors:

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<consume() running at Services/Consumer.py:26> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29cbe0>()]> cb=[_chain_future.<locals>._call_set_state() at asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-426485' coro=<RedisConnection._read_data() done, defined at aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29ccd0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at aioredis/connection.py:168]>

Which I'm not sure on how to interpret, resolve or recover from, my assumption is that first I should probably switch to redis streams instead of using channels and queues.

But, going back to this scenarios I have channel subscribers on different processes while the consumer run in the same process as different tasks in the loop.

What I'm assuming is happening here is that since the consumer is basically polling a queue at some point the connection pool manager or redis itself eventually starts hanging up on the connection open of the consumer and it gets cancelled.

Cause I'm not seeing any further message from that queue processor, but I also see that wait_for_future which I'm uncertain it may come from the subscriber ensure_future on the message reader

import asyncio
from multiprocessing import process
from Helpers.Log import Log
import Services.Metas as metas
import Models.SubscriberStrategies as processor
import Connectors.datastore as ds_linker
import Models.Exceptions as Exceptions

async def subscriber(conf: dict, channel: str, processor: processor.Strategy) -> None:
    """Subscription handler. Receives the channel name, datastore connection and a parsing strategy.
    Creates a task that listens on the channel and process every message and processing strategy for the specific message

    Args:
        conf (dict): configuration dictionary
        channel (str): channel to subscribe to
        ds (aioredis.connection): connection handler to datastore
        processor (processor.Strategy): processor message handler
    """
    async def reader(ch):
        while await ch.wait_message():
            msg = await ch.get_json()
            await processor.handle_message(msg=msg)

    ds_uri = ds_linker.get_uri(conf=conf)
    ds = await ds_linker.get_datastore_handle(ds_uri)
    pub = await ds.subscribe(channel)
    ch = pub[0]
    tsk = asyncio.ensure_future(reader(ch))
    await tsk

I could use some help to sort this out and properly understand what's happening under the hood. thanks


Solution

  • Took a few days to solve just to reproduce the issue, I've found people with the same problem in the issues for the aioredis github repo.

    So I had to go through all the connection open/close with redis to be sure added:

            ds_handle.close()
            await ds_handle.wait_closed()
    

    I also proceeded to improve the exception management in the consumer:

    while True:
        try:
            ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
            token = await ds_handle.lpop(queue)
            if token is not None:
                result = await processor.consume(json.loads(token), ds_handle)
                status = await processor.relay(result, ds_handle)
                logger.debug(status)
            else:
                wait_for = randint(MIN,MAX)
                logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
                await asyncio.sleep(wait_for)   
        except Exception as e:
            logger.error(f"{e}")
            logger.error(f"{traceback.print_exc()}")
        finally:
            ds_handle.close()
            await ds_handle.wait_closed()
    

    and the same for the producer:

    try:
        async def reader(ch):
            while await ch.wait_message():
                msg = await ch.get_json()
                await processor.handle_message(msg=msg)
    
        ds_uri = ds_linker.get_uri(conf=conf)
        ds = await ds_linker.get_datastore_handle(ds_uri)
        pub = await ds.subscribe(channel)
        ch = pub[0]
        tsk = asyncio.ensure_future(reader(ch))
        await tsk
    except Exception as e:
        logger.debug(f'{e}')
        logger.error(f'{traceback.format_exc()}')
    finally:
        ds.close()
        await ds.wait_closed()
    

    so there are never connections left open with redis that might end up killing one of the processor's coroutines as time goes by.

    For me it solved the issue, since at the time I'm writing this it has been more than 2 weeks uptime with no more reported accidents of the same kind.

    Anyway, there is also a new aioredis major release, it's really recent news (this was on 1.3.1 and 2.0.0 should work using the same model as redis-py, so things have changed as well by this time).