Search code examples
pythonjupyter-notebookpython-asyncio

How to correctly schedule and wait for result in asyncio code from synchronous context in Jupyter notebook?


I have a small utility for calling synchronous code using asyncio in parallel.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        print('running loop scheduling there')
        # implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
        future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
                                                loop)
        print('got the future')
        res = future.result()
        print('got the result')
        return res
    else:
        return loop.run_until_complete(call_many_async(fun, many_kwargs))

and it works well when used from python

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

from python works without any problem, but I have problem with using it from IPython in Jupyter, I just get

running loop scheduling there
got the future

and it hangs forever.

Originally I had just

def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(call_many_async(fun, many_kwargs))

but there I was getting the error

RuntimeError: This event loop is already running

How to solve it?

Of course the

results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4

works, but I want to use the call_many as part of the larger codebase which I will be calling from jupyter notebook. I have read the https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7 but I did not find the solution, because I do not want to call the asynchronous code directly from the jupyter notebook cell, but from a synchronous code.

I want to avoid solutions using async def call_many(fun, many_kwargs) because the whole point is to be able to use the code which is calling this function from several places without needing to have sync and async equivalent of the same thing.

I have seen How do I run Python asyncio code in a Jupyter notebook? but that explains how to call asyncio code directly, which I'm explaining above I'm not interested in.


Solution

  • jupyter runs its own eventloop in the main thread, which is why you can call await from jupyter directly, also asyncio.run_coroutine_threadsafe states:

    This function is meant to be called from a different OS thread than the one where the event loop is running.

    so it is a deadlock to wait for it on the same thead. i would probably just run your eventloop in another thread.

    import threading
    import asyncio
    
    async def call_many_async(fun, many_kwargs):
        return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
        
    def call_many(fun, many_kwargs):
        result = None
        def run_func():
            nonlocal result
            loop = asyncio.new_event_loop()
            result = loop.run_until_complete(call_many_async(fun, many_kwargs))
        thread = threading.Thread(target=run_func)
        thread.start()
        thread.join()
        return result
    
    import time
    def something_complex(param) -> int:
        print(f"call started with {param=}")
        time.sleep(0.1) # calling some time-costly API
        print("call ended")
        return 3 # returning the result
    
    results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
    

    otherwise you can totally just stack async calls all the way from jupyter's main execution point down to your code, by making all functions async and await each and every one.

    import asyncio
    
    async def call_many_async(fun, many_kwargs):
        return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
        
    async def call_many(fun, many_kwargs):
        return await call_many_async(fun, many_kwargs)
    
    fut = call_many(something_complex, ({"param": i} for i in range(1, 5)))
    results = await fut
    

    lastly if you are only using to_thread you might just create and use ThreadPoolExecutor and directly call its .map function, and not have to create or use an eventloop.

    from concurrent.futures import ThreadPoolExecutor
    
    def call_many(fun, many_kwargs):
        result = None
        with ThreadPoolExecutor() as pool:
              return list(pool.map(fun, many_kwargs))
    
    import time
    def something_complex(param) -> int:
        print(f"call started with {param=}")
        time.sleep(0.1) # calling some time-costly API
        print("call ended")
        return 3 # returning the result
    
    results = call_many(something_complex, ({"param": i} for i in range(1, 5)))