Search code examples
pythonerror-handlingpython-asyncio

How to efficiently handle nest asynchronous ops with Python asyncio lib?


I working in Python project that has next asyncio operartions. I want to make sure each level has good error handling and comes back to main loop for repeated execution. I want to set like, I there is any error, I want the pnding tasks to be cancelled.

import asyncio

async def fetch_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return f"Data from {service_name}"

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    try:
        dataA = await fetch_data_from_service("ServiceA")
        dataB = await fetch_data_from_service("ServiceB")
        dataC = await fetch_data_from_service("ServiceC")
        
        processedA = await process_data(dataA)
        processedB = await process_data(dataB)
        processedC = await process_data(dataC)

        print(processedA, processedB, processedC)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        # How to ensure all pending tasks are canceled if an error occurs?
        # How to propagate this error back to the main event loop?

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

I want efficient nest handling, each level checking, that task cancellation if error occurs, and error redirection to main event loop.

I wrapped each op in try-expect block to catch and manage errors. But, I'm not sure about that cancellation process.

I expect to manage async tasks without errors, eventhough an error occurs, call all taks and divert to main event loop.

Please, help me with this.


Solution

  • My first observation is that you are not taking advantage of asyncio as your code is currently written: You are creating a coroutine and immediately scheduling it to run and awaiting its completion before repeating this action with your next coroutine. That is, you do not have multiple asyncio tasks running concurrently and will get no overlap in processing. We will remedy this by creating three tasks that will run concurrently. Since you want an exception raised in one task to cause the termination of the other tasks, then I suggest using a task group. Note that the code can be simplified by having the fetch_data_from_service task calling process_data directly and thus the function has been renamed to fetch_and_process_data_from_service. If process_data is CPU-intensive, then consider executing this code in a separate process using run_in_executor.

    import asyncio
    
    async def fetch_and_process_data_from_service(service_name):
        await asyncio.sleep(1)  # Simulating I/O operation
        if service_name == "ServiceB":
            raise Exception(f"Error fetching data from {service_name}")
        return await process_data(f"Data from {service_name}")
    
    async def process_data(data):
        await asyncio.sleep(1)  # Simulating data processing
        if data == "Data from ServiceC":
            raise Exception("Error processing data from ServiceC")
        return f"Processed {data}"
    
    async def main_task():
        try:
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(fetch_and_process_data_from_service(service_name))
                    for service_name in ("ServiceA", "ServiceB", "ServiceC")
                ]
    
            results = [
                task.result() for task in tasks
            ]
            print(results)
        except Exception as e:
            print(f"Exception caught in main_task: {e}")
            raise  # propogate the exception back
    
    # Running the event loop
    if __name__ == "__main__":
        try:
            asyncio.run(main_task())
        except Exception as e:
            print(f"Exception caught in event loop: {e}")
    

    Prints:

    Exception caught in main_task: unhandled errors in a TaskGroup (1 sub-exception)
    Exception caught in event loop: unhandled errors in a TaskGroup (1 sub-exception)
    

    Note, however, that we don't know which task failed and why. If this is important we can schedule the coroutines to run as concurrent tasks using `asyncio.gather', but it becomes necessary to explicitly cancel uncompleted tasks if one of the tasks raises an exception:

    import asyncio
    
    async def fetch_and_process_data_from_service(service_name):
        await asyncio.sleep(1)  # Simulating I/O operation
        if service_name == "ServiceB":
            raise Exception(f"Error fetching data from {service_name}")
        return await process_data(f"Data from {service_name}")
    
    async def process_data(data):
        await asyncio.sleep(1)  # Simulating data processing
        if data == "Data from ServiceC":
            raise Exception("Error processing data from ServiceC")
        return f"Processed {data}"
    
    async def main_task():
        tasks = [
            asyncio.create_task(fetch_and_process_data_from_service(service_name))
            for service_name in ("ServiceA", "ServiceB", "ServiceC")
        ]
    
        try:
            results = await asyncio.gather(*tasks)
            print(results)
        except Exception as e:
            print(f"Exception caught in main_task: {e}")
            for task in tasks:
                if not task.done():
                    task.cancel()
            raise  # propogate the exception back
    
    # Running the event loop
    if __name__ == "__main__":
        try:
            asyncio.run(main_task())
        except Exception as e:
            print(f"Exception caught in event loop: {e}")
    

    Prints:

    Exception caught in main_task: Error fetching data from ServiceB
    Exception caught in event loop: Error fetching data from ServiceB