Search code examples
python-asyncio

Can I add a task to an asyncio loop that is already running a protocol server?


Or, in other words, why does this code never print Hello, world:

import asyncio
import threading
import time

class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self.transport.write(data)

loop = None
def serve():
    async def _serve():
        server = await asyncio.get_running_loop().create_server(EchoProtocol, '::1', 2000)
        await server.serve_forever()

    global loop
    loop = asyncio.new_event_loop()
    loop.run_until_complete(_serve())

async def hello():
    print("Hello, world", flush=True)

def main():
    t = threading.Thread(target=serve)
    t.start()
    time.sleep(0.1)
    loop.call_soon_threadsafe(hello)
    t.join()

main()

This sets up a TCP echo server on port 2000 which works as intended. But the call to call_soon_threadsafe() doesn't seem to actually insert the new coroutine into the thread loop running in the thread.


Solution

  • Your problem is simply that call_soon_thread_safe takes a regular function, not an async function.

    The function is called in the other thread as part of the scheduling of the asyncio loop, and, if needed it can call asyncio.get_running_loop and then iterate with it.

    It is possible then to call loop.create_task to add new co-routines to be executed in the running loop - but not to do asyncio.run or loop.run_until_complete.

    In other words, the task can be created in the synchronous function added as a callback from other thread, but you can't await for it - for few tasks, since the loop is running in the server, it will just work with "fire and forget":

    async def hello_async():
        await asyncio.sleep(2)
        print("async hello")
    
    def hello():
        print("Hello, world")
        loop = asyncio.get_running_loop()
        task = loop.create_task(hello_async())
        return
    

    (these two functions added in your listing will run both print statements: note that "hello" has changed from "async def" to just "def")

    If you need a more elaborate thing, ensuring tasks from a foreign thread are executed and retrieving their results, it is best to have a permanent task checking a global structure where newly created tasks are added like:

    import asyncio
    import threading
    import time
    
    class EchoProtocol(asyncio.Protocol):
        def connection_made(self, transport):
            self.transport = transport
    
        def data_received(self, data):
            self.transport.write(data)
    
    loop = None
    def serve():
        async def _serve():
            server = await asyncio.get_running_loop().create_server(EchoProtocol, '::1', 2000)
            await server.serve_forever()
    
        global loop
        loop = asyncio.new_event_loop()
        loop.create_task(foreign_task_worker())
        loop.run_until_complete(_serve())
    
    
    task_registry = []
    
    async def foreign_task_worker():
        while True:
            if task_registry:
                # The quid-pro-quo with task_registry is so
                # to ensure new tasks are not ignored
                tasks = task_registry[:]
                task_registry[:] = []
                results = asyncio.gather(*tasks)
                # do something with results
                ...
            await asyncio.sleep(0.1)
    
    
    async def hello_async():
        await asyncio.sleep(2)
        print("async hello")
    
    def hello():
        print("Hello, world", flush=True)
        loop = asyncio.get_running_loop()
        task_registry.append(loop.create_task(hello_async()))
    
    
    def main():
        t = threading.Thread(target=serve)
        t.start()
        time.sleep(0.1)
        loop.call_soon_threadsafe(hello)
        t.join()
    
    main()
    

    Note that asyncio won't complain if you do call loop.create_task passing a co-routine from the main thread - and it will even work most of times . However, the inner workings of the loop itself are not thread-safe, and eventually doing this will add a task in a point it will wreck the loop internal state.