Search code examples
python-3.xasync-awaitpython-asynciopython-multithreading

Python: concurrently pending on async coroutine and synchronous function


I'd like to establish an SSH SOCKs tunnel (using asyncssh) during the execution of a synchronous function. When the function is done I want to tear down the tunnel and exit.

Apparently some async function has to be awaited to keep the tunnel working so the important thing is that conn.wait_closed() and the synchronous function are executed concurrently. So I am quite sure that I actually need a second thread. I first tried some saner things using a ThreadPoolExecutor with run_in_executor but then ended up with the abysmal multihreaded variant below.

#! /usr/bin/env python3

import traceback
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

import asyncio, asyncssh, sys

_server="127.0.0.1"
_port=22
_proxy_port=8080


async def run_client():
    conn = await asyncio.wait_for(
        asyncssh.connect(
            _server,
            port=_port,
            options=asyncssh.SSHClientConnectionOptions(client_host_keysign=True),
        ),
        10,
    )

    listener = await conn.forward_socks('127.0.0.1', _proxy_port)
    return conn

async def do_stuff(func):
    try:
        conn = await run_client()
        print("SSH tunnel active")

        def start_loop(loop):
            asyncio.set_event_loop(loop)
            try:
                loop.run_forever()
            except Exception as e:
                print(f"worker loop: {e}")

        async def thread_func():
            ret=await func()
            print("Func done - tearing done worker thread and SSH connection")
            conn.close()
            #  asyncio.get_event_loop().stop()
            return ret

        func_loop = asyncio.new_event_loop()
        func_thread = Thread(target=start_loop, args=(func_loop,))
        func_thread.start()
        print("thread started")
        fut = asyncio.run_coroutine_threadsafe(thread_func(), func_loop)
        print(f"fut scheduled: {fut}")

        done = await asyncio.gather(asyncio.wrap_future(fut), conn.wait_closed())
        print("wait done")
        for ret in done:
            print(f"ret={ret}")

        # Canceling pending tasks and stopping the loop
        #  asyncio.gather(*asyncio.Task.all_tasks()).cancel()

        print("stopping func_loop")
        func_loop.call_soon_threadsafe(func_loop.stop())
        print("joining func_thread")
        func_thread.join()
        print("joined func_thread")

    except (OSError, asyncssh.Error) as exc:
        sys.exit('SSH connection failed: ' + str(exc))
    except (Exception) as exc:
        sys.exit('Unhandled exception: ' + str(exc))
        traceback.print_exc()


async def just_wait():
    print("starting just_wait")
    input()
    print("ending just_wait")
    return 42

asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))

It actually "works" "correctly" till the end where I get an exception while joining the worker thread. I presume because something I do is not threadsafe.

Exception in callback None()
handle: <Handle>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
TypeError: 'NoneType' object is not callable

To test the code you must have a local SSH server running with key files setup for your user. You may want to change the _port variable.

I am looking for the reason of the exception and/or a version of the program that requires less manual intervention in the threading and possibly uses just a single event loop. I don't know how to achieve the latter when I want to await the two things (as in the asyncio.gather call).


Solution

  • The immediate cause of your error is this line:

    # incorrect
    func_loop.call_soon_threadsafe(func_loop.stop())
    

    The intention is to call func_loop.stop() in the thread that runs the func_loop event loop. But as written, it invokes func_loop.stop() in the current thread and passes its return value (None) to call_soon_threadsafe as the function to invoke. This causes call_soon_threadsafe to complain that None is not callable. To fix the immediate problem, you should drop the extra parentheses and invoke the method as:

    # correct
    func_loop.call_soon_threadsafe(func_loop.stop)
    

    However, the code is definitely over-complicated as written:

    • it doesn't make sense to create a new event loop when you are already inside an event loop
    • just_wait shouldn't be async def since it doesn't await anything, so it's clearly not async.
    • sys.exit takes an integer exit status, not a string. Also, it doesn't make much sense to attempt to print a backtrace after the call to sys.exit.

    To run a non-async function from asyncio, just use run_in_executor with the function and pass it the non-async function as-is. You don't need an extra thread nor an extra event loop, run_in_executor will take care of the thread and connect it with your current event loop, effectively making the sync function awaitable. For example (untested):

    async def do_stuff(func):
        conn = await run_client()
        print("SSH tunnel active")
        loop = asyncio.get_event_loop()
        ret = await loop.run_in_executor(None, func)
        print(f"ret={ret}")
        conn.close()
        await conn.wait_closed()
        print("wait done")
    
    def just_wait():
        # just_wait is a regular function; it can call blocking code,
        # but it cannot await
        print("starting just_wait")
        input()
        print("ending just_wait")
        return 42
    
    asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))
    

    If you need to await things in just_wait, you can make it async and use run_in_executor for the actual blocking code inside it:

    async def do_stuff():
        conn = await run_client()
        print("SSH tunnel active")
        loop = asyncio.get_event_loop()
        ret = await just_wait()
        print(f"ret={ret}")
        conn.close()
        await conn.wait_closed()
        print("wait done")
    
    async def just_wait():
        # just_wait is an async function, it can await, but
        # must invoke blocking code through run_in_executor
        print("starting just_wait")
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, input)
        print("ending just_wait")
        return 42
    
    asyncio.run(do_stuff())