Search code examples
pythonpython-asyncio

How to enforce in-order-of-receival processing of async message handlers?


Suppose we have a message handler. Messages might be received from a TCP socket and dispatched to a handler:

async def on_recv(msg):
    await handle(msg)

while True:
    msg = await socket.recv()
    await on_recv(msg)

This is problematic because handle may take some time or send another message and wait on the socket for a response, and become deadlocked (on_recv did not return!). To prevent this, we may make the handler a task so that it does not block us from receiving the next message:

async def on_recv(msg):
    asynico.Task(handle(msg))

However, here I believe its true that we lose guarantees about the order of processing (specifically the order in which the tasks start). Suppose I wanted to make sure that each handler starts is execution in the order of message receival. How might I do that? My goal here to introduce more deterministic behavior (it's OK where this isn't guaranteed when the handler hits IO boundaries).

In trying to find a solution, it looks like finger puzzle. Everything I try here either leaves messages potentially not starting, involved an scheduler which itself has the same issue of the orginal dispatcher (cedeing order to an asyncio.Task).


Solution

  • A common pattern is to use a queue together with any number of producers (that put stuff into the queue) and consumers (that take stuff out of the queue).

    In this case the obvious choice would be the asyncio.Queue. Since a queue preserves order, you can have (for example) your main coroutine collect messages and put them in the queue (thus setting the order) and one consumer task asynchronously consuming the queue.

    Here is a little demo for you:

    from asyncio import CancelledError, Queue, create_task, run, sleep
    from random import randint
    
    async def receive_message(number: int) -> str:
        seconds = randint(1, 5)
        await sleep(seconds)
        message = f"message {number}"
        print(f"{message} took {seconds} seconds to receive")
        return message
    
    async def handle(message: str) -> None:
        seconds = randint(1, 5)
        await sleep(seconds)
        print(f"processed: {message} (took {seconds} seconds)")
    
    async def consumer(q: Queue) -> None:
        while True:
            try:
                next_message = await q.get()
            except CancelledError:
                print("stopping consumer...")
                return
            await handle(next_message)
    
    async def main() -> None:
        q = Queue()
        consumer_task = create_task(consumer(q))
        for i in range(5):
            message = await receive_message(i)
            q.put_nowait(message)
        await consumer_task  # will block forever
    
    if __name__ == "__main__":
        run(main())
    

    Sample output: (stopped with SIGINT after the last processed message)

    message 0 took 1 seconds to receive
    processed: message 0 (took 1 seconds)
    message 1 took 2 seconds to receive
    message 2 took 1 seconds to receive
    processed: message 1 (took 5 seconds)
    message 3 took 4 seconds to receive
    message 4 took 2 seconds to receive
    processed: message 2 (took 3 seconds)
    processed: message 3 (took 2 seconds)
    processed: message 4 (took 2 seconds)
    stopping consumer...
    

    As you can easily verify with a few experiments (and prove theoretically), the processing will always be in the order the messages are received (0 to 4).