Search code examples
pythonpython-asynciocomputer-science

1-item asyncio queue - is this some standard thing?


In one of my asyncio projects I use one synchronisation method quite a lot and was wondering, if it is some kind of standard tool with a name I could give to google to learn more. I used the term "1-item queue" only because I don't have a better name. It is a degraded queue and it is NOT related to Queue(maxsize=1).

#  [controller] ---- commands ---> [worker]

The controller sends commands to a worker (queue.put, actually put_nowait) and the worker waits for them (queue.get) and executes them, but the special rule is that the only the last command is important and immediately replaces all prior unfinished commands. For this reason, there is never more than 1 command waiting for the execution in the queue.

To implement this, the controller clears the queue before the put. There is no queue.clear, so it must discard (with get_nowait) the waiting item, if any. (The absence of queue.clear started my doubts resulting in this question.)

On the worker's side, if a command execution requires a sleep, it is replaced by a newcmd=queue.get with a timeout. When the timeout occurs, it was a sleep; when the get succeeds, the current work is aborted and the execution of newcmd starts.


Solution

  • The type of queue you are using is not standard - there is such a thing as a one-shot queue, but it's a different thing altogether.

    The queue doesn't really fit your use case, though you made it work with some effort. You don't really need queuing of any kind, you need a slot that holds a single object (which can be replaced) and a wakeup mechanism. asyncio.Event can be used for the wakeup and you can attach the payload object (the command) to an attribute of the event. For example:

    async def worker(evt):
        while True:
            await evt.wait()
            evt.clear()
            if evt.last_command is None:
                continue
            last_command = evt.last_command
            evt.last_command = None
            # execute last_command, possibly with timeout
            print(last_command)
    
    async def main():
        evt = asyncio.Event()
        workers = [asyncio.create_task(worker(evt)) for _ in range(5)]
        for i in itertools.count():
            await asyncio.sleep(1)
            evt.last_command = f"foo {i}"
            evt.set()
    
    asyncio.run(main())
    

    One difference between this and the queue-based approach is that setting the event will wake up all workers (if there is more than one), even if the first worker immediately calls evt.clear(). A queue item, on the other hand, will be guaranteed to be handed off to a single awaiter of queue.get().