I have a small utility for calling synchronous code using asyncio
in parallel.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
if loop.is_running():
print('running loop scheduling there')
# implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
loop)
print('got the future')
res = future.result()
print('got the result')
return res
else:
return loop.run_until_complete(call_many_async(fun, many_kwargs))
and it works well when used from python
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
from python works without any problem, but I have problem with using it from IPython
in Jupyter, I just get
running loop scheduling there
got the future
and it hangs forever.
Originally I had just
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(call_many_async(fun, many_kwargs))
but there I was getting the error
RuntimeError: This event loop is already running
How to solve it?
Of course the
results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4
works, but I want to use the call_many
as part of the larger codebase which I will be calling from jupyter notebook.
I have read the https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7 but I did not find the solution, because I do not want to call the asynchronous code directly from the jupyter notebook cell, but from a synchronous code.
I want to avoid solutions using async def call_many(fun, many_kwargs)
because the whole point is to be able to use the code which is calling this function from several places without needing to have sync and async equivalent of the same thing.
I have seen How do I run Python asyncio code in a Jupyter notebook? but that explains how to call asyncio code directly, which I'm explaining above I'm not interested in.
jupyter runs its own eventloop in the main thread, which is why you can call await
from jupyter directly, also asyncio.run_coroutine_threadsafe states:
This function is meant to be called from a different OS thread than the one where the event loop is running.
so it is a deadlock to wait for it on the same thead. i would probably just run your eventloop in another thread.
import threading
import asyncio
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
result = None
def run_func():
nonlocal result
loop = asyncio.new_event_loop()
result = loop.run_until_complete(call_many_async(fun, many_kwargs))
thread = threading.Thread(target=run_func)
thread.start()
thread.join()
return result
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
otherwise you can totally just stack async
calls all the way from jupyter's main execution point down to your code, by making all functions async
and await
each and every one.
import asyncio
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
async def call_many(fun, many_kwargs):
return await call_many_async(fun, many_kwargs)
fut = call_many(something_complex, ({"param": i} for i in range(1, 5)))
results = await fut
lastly if you are only using to_thread
you might just create and use ThreadPoolExecutor and directly call its .map
function, and not have to create or use an eventloop.
from concurrent.futures import ThreadPoolExecutor
def call_many(fun, many_kwargs):
result = None
with ThreadPoolExecutor() as pool:
return list(pool.map(fun, many_kwargs))
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))