I have a function def event(...)
in my EventHandler
class which can be called at arbitrary intervals in my main code. It needs to put all the events that are coming in into a queue and process them one at a time. How do I do this? I've been trying to use the asyncio library to do this, but I have no clue how to do it. My problem is that if I make event
async, then I get the error that it was never awaited. If I don't make it async, then I can't call the function that processes the next item in the queue.
async def _event(...):
queue_item = await(self._queue.get())
...
I need to be able to call event at any time, but the helper function _event
needs to processes the next item in queue one at a time.
How do I solve this?
I tried making event synchronous
def event(...):
"""
When an event happens, this will add it to the queue. Multiple events can happen, and they will be executed one at a time, as they come in. Later on, we can change this.
"""
self._queue.put_nowait((flag, *args))
task = self._loop.create_task(self._event())
asyncio.ensure_future(task)
but then the error that comes up is that task was not awaited.
Pending more details from the poster, here is an example that has a MainThreadClass
running on the main thread of a program and an asyncio
event loop running on another thread. When MainThreadClass.event
is called, it sends a message to the event loop via an asyncio.Queue
and asyncio.run_coroutine_threadsafe
. From there, the asyncio
event loop is continuously listening for messages on the queue and processes the message, printing it to the console.
import asyncio
from threading import Thread
class MainThreadClass:
def __init__(self):
# Create an `asyncio.Queue` so that the main thread can send messages
# to the `asyncio` event loop via this queue and `asyncio.run_coroutine_threadsafe`
self._async_queue = asyncio.Queue()
self._async_event_loop = asyncio.new_event_loop()
def event(self, some_incoming_data):
# You can't directly use `put_nowait` here because `asyncio.Queue` methods
# not threadsafe
asyncio.run_coroutine_threadsafe(self._async_queue.put(some_incoming_data), self._async_event_loop)
async def listen_to_queue(queue: asyncio.Queue):
while True:
message = await queue.get()
print(f"async event loop received: {message}")
# ... do something with `message` ...
async def async_main(queue: asyncio.Queue):
await asyncio.gather(listen_to_queue(queue))
def start_asyncio_event_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Starts the given `asyncio` loop on whatever the current thread is"""
asyncio.set_event_loop(loop)
loop.run_forever()
def run_event_loop(queue: asyncio.Queue, loop: asyncio.AbstractEventLoop) -> None:
"""Runs the given `asyncio` loop on a separate thread, passing the `AsyncInbox`
to the event loop for any other thread to send messages to the event loop.
"""
thread = Thread(target=start_asyncio_event_loop, args=(loop,))
thread.start()
asyncio.run_coroutine_threadsafe(async_main(queue), loop=loop)
if __name__ == "__main__":
main_thread_class = MainThreadClass()
async_queue = main_thread_class._async_queue
async_event_loop = main_thread_class._async_event_loop
run_event_loop(queue=async_queue, loop=async_event_loop)
main_thread_class.event("testing")
This program prints the following to the console:
async event loop received: testing