Search code examples
pythonasynchronousasync-awaitpython-trio

trio.Event(): Which is “better”: setting and initializing a new Event or checking if someone is waiting for it beforehand?


import trio

work_available = trio.Event()

async def get_work():
  while True:
    work = check_for_work()
    if not work:
      await work_available.wait()
    else:
      return work

def add_work_to_pile(...):
  ...
  if work_available.statistics().tasks_waiting:
    global work_available
    work_available.set()
    work_available = trio.Event()

In this Python-like code example I get work in bursts via add_work_to_pile(). The workers which get work via get_work() are slow. So most of the time add_work_to_pile() is called there will be no one waiting on work_available.

Which is better/cleaner/simpler/more pythonic/more trionic/more intended by the trio developers?

  • checking if someone is looking for the Event() via statistics().tasks_waiting, like in the example code, ...or...
  • unconditionally set() setting the Event() and creating a new one each time? (Most of them in vain.)

Furthermore... the API does not really seem to expect regular code to check if someone is waiting via this statistics() call...

I don’t mind spending a couple more lines to make things clearer. But that goes both ways: a couple CPU cycles more are fine for simpler code...


Solution

  • Creating a new Event is roughly the same cost as creating the _EventStatistics object within the statistics method. You'll need to profile your own code to pick out any small difference in performance. However, although it is safe and performant, the intent of statistics across trio's classes is for debug rather than core logic. Using/discarding many Event instances would be relatively more along the intent of the devs.

    A more trionic pattern would be to load each work item into a buffered memory channel in place of your add_work_to_pile() method and then iterate on that in the task that awaits get_work. I feel the amount of code is comparable to your example:

    import trio
    
    send_chan, recv_chan = trio.open_memory_channel(float('inf'))
    
    async def task_that_uses_work_items():
        # # compare
        # while True:
        #     work = await get_work()
        #     handle_work(work)
        async for work in recv_chan:
            handle_work(work)
    
    def add_work_to_pile():
        ...
        for work in new_work_set:
            send_chan.send_nowait(work)
    
    # maybe your work is coming in from a thread?
    def add_work_from_thread():
        ...
        for work in new_work_set:
            trio_token.run_sync_soon(send_chan.send_nowait, work)
    

    Furthermore, it's performant because the work items are efficiently rotated through a deque internally. This code would checkpoint for every work item so you may have to jump through some hoops if you want to avoid that.