Search code examples
pythonpython-asyncioaiohttpasyncpg

How can I have a synchronous facade over asyncpg APIs with Python asyncio?


Imagine an asynchronous aiohttp web application that is supported by a Postgresql database connected via asyncpg and does no other I/O. How can I have a middle-layer hosting the application logic, that is not async? (I know I can simply make everything async -- but imagine my app to have massive application logic, only bound by database I/O, and I cannot touch everything of it).

Pseudo code:

async def handler(request):
    # call into layers over layers of application code, that simply emits SQL
    ...

def application_logic():
    ...
    # This doesn't work, obviously, as await is a syntax
    # error inside synchronous code.
    data = await asyncpg_conn.execute("SQL")
    ...
    # What I want is this:
    data = asyncpg_facade.execute("SQL")
    ...

How can a synchronous façade over asyncpg be built, that allows the application logic to make database calls? The recipes floating around like using async.run() or asyncio.run_coroutine_threadsafe() etc. do not work in this case, as we're coming from an already asynchronous context. I'd assume this cannot be impossible, as there already is an event loop that could in principle run the asyncpg coroutine.

Bonus question: what is the design rationale of making await inside sync a syntax error? Wouldn't it be pretty useful to allow await from any context that originated from a coroutine, so we'd have simple means to decompose an application in functional building blocks?

EDIT Extra bonus: beyond Paul's very good answer, that stays inside the "safe zone", I'd be interested in solutions that avoid blocking the main thread (leading to something more gevent-ish). See also my comment on Paul's answer ...


Solution

  • You need to create a secondary thread where you run your async code. You initialize the secondary thread with its own event loop, which runs forever. Execute each async function by calling run_coroutine_threadsafe(), and calling result() on the returned object. That's an instance of concurrent.futures.Future, and its result() method doesn't return until the coroutine's result is ready from the secondary thread.

    Your main thread is then, in effect, calling each async function as if it were a sync function. The main thread doesn't proceed until each function call is finished. BTW it doesn't matter if your sync function is actually running in an event loop context or not.

    The calls to result() will, of course, block the main thread's event loop. That can't be avoided if you want to get the effect of running an async function from sync code.

    Needless to say, this is an ugly thing to do and it's suggestive of the wrong program structure. But you're trying to convert a legacy program, and it may help with that.

    import asyncio
    import threading
    from datetime import datetime
    
    def main():
        def thr(loop):
            asyncio.set_event_loop(loop)
            loop.run_forever()
        
        loop = asyncio.new_event_loop()
        t = threading.Thread(target=thr, args=(loop, ), daemon=True)
        t.start()
    
        print("Hello", datetime.now())
        t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
        t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
        print(t1, t2)
     
    
    if __name__ == "__main__":
        main()
    
    >>> Hello 2021-10-26 20:37:00.454577
    >>> Hello 1.0 2021-10-26 20:37:01.464127
    >>> Hello 2.0 2021-10-26 20:37:03.468691
    >>> 1.0 2.0