Search code examples
python-3.xasynchronouspython-asyncioconcurrent.futures

running an asyncio using executors raises an error of my return type not being callable


in the code below, i am trying to find all query all the files that contain the extension '.py' I do a breadth first search on the list of directories in my path and recursively call the same function on the resulting list of subdirectories. i get an error saying

Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):

the code is given below

from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(4)

async def main(basedir, query):
    loop.run_in_executor(None, find_files(basedir, query) )

@asyncio.coroutine
def find_files(path, query_string):
    subdirs = []
    for p in path.iterdir():
            fullpath = str(p.absolute)
            if p.is_dir and not p.is_symlink():
                subdirs.append(p)
            if query_string in fullpath:
                print(fullpath)
    loop =  asyncio.get_event_loop()
    tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
    yield from asyncio.gather(*tasks)
    return subdirs


query = '.py'
basedir = Path(pathsep).absolute() 

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_default_executor(executor)
    try:
        loop.run_until_complete(main(basedir, query))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        executor.shutdown(wait = True)
        loop.close()

'''

EDIT: I realize the errors from user4815162342 answer below, this works now

import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

query = '.py'
basedir = Path(pathsep).absolute() 
futures = deque()

def find_files(path, query_string):
    subdirs = []
    try:
        for p in path.iterdir():
                fullpath = str(p.absolute)
                if p.is_dir and not p.is_symlink():
                    subdirs.append(p)
                if query_string in fullpath:
                    print(fullpath)
    return subdirs

async def main(executor):
    loop = asyncio.get_event_loop()
    task = [loop.run_in_executor(executor, find_files, basedir, query) ]
    completed, _ = await asyncio.wait(task)
    result = [t.result() for t in completed]
    futures.append(result[0])
    while futures:
        future = futures.popleft()
        for subdir in future:
            task = [loop.run_in_executor(executor, find_files, subdir, query) ]
            completed, _ = await asyncio.wait(task)
            result = [t.result() for t in completed][0]
            futures.append(task[0].result() )


if __name__ == "__main__":
    tstart = time.time()
    executor = ThreadPoolExecutor(
        max_workers=4,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            main(executor)
        )
    finally:
        event_loop.close()    
print('time elapsed is', time.time() - tstart)

Solution

  • There are several issues with your code.

    • First, run_in_executor expects a callable object, which it will invoke in a different thread, suspend the execution of the current coroutine, and resume it once the callable has returned a value or raised an exception. In other words, run_in_executor expects a function and you are giving it a coroutine object instead, which is why you get an exception that such object is not callable.

    • Second, you should await the result of run_in_executor. The lack of await is the reason you are warned that the exception is never retrieved.

    • Finally, the function you are invoking is already a coroutine, so you don't need run_in_executor in the first place. run_in_executor is only for invoking CPU-bound or legacy blocking code. Just await it directly: await find_files(...).

    Please also note that you should be using async def and await in preference to the asyncio.coroutine decorator and yield from. The latter is deprecated and will be removed soon.