Search code examples
pythonasynchronousmultiprocessingpython-asynciopython-trio

Python Asyncio/Trio for Asynchronous Computing/Fetching


I am looking for a way to efficiently fetch a chunk of values from disk, and then perform computation/calculations on the chunk. My thought was a for loop that would run the disk fetching task first, then run the computation on the fetched data. I want to have my program fetch the next batch as it is running the computation so I don't have to wait for another data fetch every time a computation completes. I expect the computation will take longer than the fetching of the data from disk, and likely cannot be done truly in parallel due to a single computation task already pinning the cpu usage at near 100%.

I have provided some code below in python using trio (but could alternatively be used with asyncio to the same effect) to illustrate my best attempt at performing this operation with async programming:

import trio
import numpy as np
from datetime import datetime as dt
import time

testiters=10
dim = 6000


def generateMat(arrlen):
    for _ in range(30):
        retval= np.random.rand(arrlen, arrlen)
    # print("matrix generated")
    return retval

def computeOpertion(matrix):
    return np.linalg.inv(matrix)


def runSync():
    for _ in range(testiters):
        mat=generateMat(dim)
        result=computeOpertion(mat)
    return result

async def matGenerator_Async(count):
    for _ in range(count):
        yield generateMat(dim)

async def computeOpertion_Async(matrix):
    return computeOpertion(matrix)

async def runAsync():
    async with trio.open_nursery() as nursery:
        async for value in matGenerator_Async(testiters): 
            nursery.start_soon(computeOpertion_Async,value)
            #await computeOpertion_Async(value)

            

print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)

print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)

This code will simulate getting data from disk by generating 30 random matrices, which uses a small amount of cpu. It will then perform matrix inversion on the generated matrix, which uses 100% cpu (with openblas/mkl configuration in numpy). I compare the time taken to run the tasks by timing the synchronous and asynchronous operations.

From what I can tell, both jobs take exactly the same amount of time to finish, meaning the async operation did not speed up the execution. Observing the behavior of each computation, the sequential operation runs the fetch and computation in order and the async operation runs all the fetches first, then all the computations afterwards.

Is there a way to use asynchronously fetch and compute? Perhaps with futures or something like gather()? Asyncio has these functions, and trio has them in a seperate package trio_future. I am also open to solutions via other methods (threads and multiprocessing).

I believe that there likely exists a solution with multiprocessing that can make the disk reading operation run in a separate process. However, inter-process communication and blocking then becomes a hassle, as I would need some sort of semaphore to control how many blocks could be generated at a time due to memory constraints, and multiprocessing tends to be quite heavy and slow.

EDIT

Thank you VPfB for your answer. I am not able to sleep(0) in the operation, but I think even if I did, it would necessarily block the computation in favor of performing disk operations. I think this may be a hard limitation of python threading and asyncio, that it can only execute 1 thread at a time. Running two different processes simultaneously is impossible if both require anything but waiting for some external resource to respond from your CPU.

Perhaps there is a way with an executor for a multiprocessing pool. I have added the following code below:

import asyncio
import concurrent.futures

async def asynciorunAsync():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:    
         async for value in matGenerator_Async(testiters):              
            result = await loop.run_in_executor(pool, computeOpertion,value)


print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)

Although timing this, it still takes the same amount of time as the synchronous example. I think I will have to go with a more involved solution as it seems that async and await are too crude of a tool to properly do this type of task switching.


Solution

  • I don't work with trio, my answer it asyncio based.

    Under these circumstances the only way to improve the asyncio performance I see is to break the computation into smaller pieces and insert await sleep(0) between them. This would allow the data fetching task to run.

    Asyncio uses cooperative scheduling. A synchronous CPU bound routine does not cooperate, it blocks everything else while it is running.

    sleep() always suspends the current task, allowing other tasks to run.

    Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.

    (quoted from: asyncio.sleep)


    If that is not possible, try to run the computation in an executor. This adds some multi-threading capabilities to otherwise pure asyncio code.