This code prints all messages from a websocket connection:
class OrderStreamer:
def __init__(ᬑ):
ᬑ.terminate_flag = False
# worker thread to receive data stream
ᬑ.worker_thread = threading.Thread(
target=ᬑ.worker_thread_func,
daemon=True
)
def start_streaming(ᬑ, from_scheduler = False):
ᬑ.worker_thread.start()
def terminate(ᬑ):
ᬑ.terminate_flag = True
def worker_thread_func(ᬑ):
asyncio.run(ᬑ.aio_func()) # blocks
async def aio_func(ᬑ):
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
async for msg in wsock:
print(msg.data)
if ᬑ.terminate_flag:
await wsock.close()
The problem is that if no messages arrive, the loop never gets the chance to check terminate_flag
and never exits.
I tried creating an external reference to the runloop and websocket:
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
ᬑ.wsock = wsock
ᬑ.loop = asyncio.get_event_loop()
... and modifying my terminate function:
def terminate(ᬑ):
# ᬑ.loop.stop()
asyncio.set_event_loop(ᬑ.loop)
async def kill():
await ᬑ.wsock.close()
asyncio.run(kill())
... but it does not work.
I can't afford to rearchitect my entire application to use asyncio at this point in time.
How to break out of the loop?
You should use asyncio.wait_for
or asyncio.wait
and call wsock.__anext__()
directly instead of using async for
loop.
The loop with asyncio.wait
should look something like this:
next_message = asyncio.create_task(wsock.__anext__())
while not self.terminate_flag:
await asyncio.wait([next_message], timeout=SOME_TIMEOUT,)
if next_message.done():
try:
msg = next_message.result()
except StopAsyncIteration:
break
else:
print(msg.data)
next_message = asyncio.create_task(wsock.__anext__())
SOME_TIMEOUT
should be replaced with the amount of seconds you want to wait continuously for the next incoming message
Here is the documentation for asyncio.wait
P.S. I replaced ᬑ
with self
, but I hope you get the idea