in the code below, i am trying to find all query all the files that contain the extension '.py' I do a breadth first search on the list of directories in my path and recursively call the same function on the resulting list of subdirectories. i get an error saying
Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):
the code is given below
from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(4)
async def main(basedir, query):
loop.run_in_executor(None, find_files(basedir, query) )
@asyncio.coroutine
def find_files(path, query_string):
subdirs = []
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
yield from asyncio.gather(*tasks)
return subdirs
query = '.py'
basedir = Path(pathsep).absolute()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
try:
loop.run_until_complete(main(basedir, query))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
executor.shutdown(wait = True)
loop.close()
'''
EDIT: I realize the errors from user4815162342 answer below, this works now
import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
query = '.py'
basedir = Path(pathsep).absolute()
futures = deque()
def find_files(path, query_string):
subdirs = []
try:
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
return subdirs
async def main(executor):
loop = asyncio.get_event_loop()
task = [loop.run_in_executor(executor, find_files, basedir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed]
futures.append(result[0])
while futures:
future = futures.popleft()
for subdir in future:
task = [loop.run_in_executor(executor, find_files, subdir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed][0]
futures.append(task[0].result() )
if __name__ == "__main__":
tstart = time.time()
executor = ThreadPoolExecutor(
max_workers=4,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
main(executor)
)
finally:
event_loop.close()
print('time elapsed is', time.time() - tstart)
There are several issues with your code.
First, run_in_executor
expects a callable object, which it will invoke in a different thread, suspend the execution of the current coroutine, and resume it once the callable has returned a value or raised an exception. In other words, run_in_executor
expects a function and you are giving it a coroutine object instead, which is why you get an exception that such object is not callable.
Second, you should await the result of run_in_executor
. The lack of await
is the reason you are warned that the exception is never retrieved.
Finally, the function you are invoking is already a coroutine, so you don't need run_in_executor
in the first place. run_in_executor
is only for invoking CPU-bound or legacy blocking code. Just await it directly: await find_files(...)
.
Please also note that you should be using async def
and await
in preference to the asyncio.coroutine
decorator and yield from
. The latter is deprecated and will be removed soon.