I'm trying to run a function on separate threads using asyncio
and futures
. I have a decorator which takes the long running function and its argument asynchronously and outputs its value. Unfortunately the processes seem to not be working asynchronously.
def multiprocess(self, function, executor=None, *args, **kwargs):
async def run_task(function, *args, **kwargs):
@functools.wraps(function)
async def wrap(*args, **kwargs):
while True:
execution_runner = executor or self._DEFAULT_POOL_
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = await asyncio.wrap_future(executed_job)
return future
return wrap
return asyncio.run(run_task(function, *args, **kwargs))
To call the decorator I have two functions _async_task
and task_function
. _async_task
contains a loop that runs task_function
for each document that needs to be processed.
@staticmethod
def _async_task(documents):
processed_docs = asyncio.run(task_function(documents))
return processed_docs
task_function
processes each document in documents as below,
@multiprocess
async def task_function(documents):
processed_documents = None
try:
for doc in documents:
processed_documents = process_document(doc)
print(processed_documents)
except Exception as err:
print(err)
return processed_documents
The clue that this doesn't work asynchronously is that the diagnostics I have for the multithreading decorator print the following.
Pending summarise_news: 0 jobs
Threads: summarise_news: 2
Since there's no pending jobs and the entire process takes as long as the synchronous run, it's running synchronously.
I had some issues setting up your code, but I think I've come up with an answer.
First of all, as @dano mentioned in his comment, asyncio.run
blocks until the coroutine running is completed. Thus, you won't get any speedup from using this approach.
I used a slightly modified multiprocess
decorator
def multiprocess(executor=None, *args, **kwargs):
def run_task(function, *args, **kwargs):
def wrap(*args, **kwargs):
execution_runner = executor or DEFAULT_EXECUTOR
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = asyncio.wrap_future(executed_job)
return future
return wrap
return run_task
As you can see, there's no asyncio.run
here, and both the decorator and inner wrapper are synchronous since asyncio.wrap_future
does not need await
.
The updated multiprocess
decorator is now used with process_document
function. The reason for that is you won't get any benefit of parallelizing a function that calls blocking functions in a sequence. You have to convert your blocking function to be runnable in an executor instead.
NOTE that this dummy process_document
is exactly like I described - fully blocking and synchronous.
@multiprocess()
def process_document(doc):
print(f"Processing doc: {doc}...")
time.sleep(2)
print(f"Doc {doc} done.")
Now, to the last point. We already made process_document
kind of asynchronous by converting it to be runnable in an executor, BUT it still matters HOW exactly you invoke it.
Consider the following examples:
for doc in documents:
result = await process_document(doc)
results = await asyncio.gather(*[process_document(doc) for doc in documents])
In the former one, we will wait for coroutines sequentially, having to wait until one finishes before starting another. In the latter example, they will be executed in parallel, so it really depends on how exactly you invoke your coroutine execution.
Here's the full code snipped I used:
import asyncio
import concurrent.futures
import time
DEFAULT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def multiprocess(executor=None, *args, **kwargs):
def run_task(function, *args, **kwargs):
def wrap(*args, **kwargs):
execution_runner = executor or DEFAULT_EXECUTOR
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = asyncio.wrap_future(executed_job)
return future
return wrap
return run_task
@multiprocess()
def process_document(doc):
print(f"Processing doc: {doc}...")
time.sleep(2)
print(f"Doc {doc} done.")
async def task_function_sequential(documents):
start = time.time()
for doc in documents:
await process_document(doc)
end = time.time()
print(f"task_function_sequential took: {end-start}s")
async def task_function_parallel(documents):
start = time.time()
jobs = [process_document(doc) for doc in documents]
await asyncio.gather(*jobs)
end = time.time()
print(f"task_function_parallel took: {end-start}s")
async def main():
documents = [i for i in range(5)]
await task_function_sequential(documents)
await task_function_parallel(documents)
asyncio.run(main())
Notice that the task_function_parallel
example still takes around 4 seconds, instead of 2, because the thread pool is limited to 4 workers, and the number of jobs is 5, so the last job will be waiting for some workers to be available.