I have some code for communicating with a websocket connection. I want to send a message to the server and returns a future which will resolve with the data sent by the server once the server sends an end signal. The code collecting the responses should run in the background, in a non-blocking way, collecting the responses until an end signal is sent, which should trigger the future to resolve. Multiple of these should be able to be running concurrently.
The code I am using to do this is below:
import asyncio
import websocket
from pyee.base import EventEmitter
import json
def done_callback(future):
try:
result = future.result()
except Exception as exc:
raise
class WebSocketResponder(EventEmitter):
def __init__(self):
super().__init__()
return
async def on_response(self, response):
# Notify the callee that a payload has been received.
print("reading response: ", response)
if response != "end":
print("got non end response: ", response)
self.emit("data", response)
else:
print("emitting end")
self.emit("end")
return
class Processor:
def __init__(self, wsApiUrl) -> None:
self.wsApiUrl = wsApiUrl
# websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
self.websocket = websocket.WebSocket()
self.websocket.connect(self.wsApiUrl)
async def on_message():
while True:
response = self.websocket.recv()
await self.process(response)
loop = asyncio.get_running_loop()
self.receiving_coroutine = asyncio.run_coroutine_threadsafe(on_message(), loop)
self.receiving_coroutine.add_done_callback(done_callback) # adds callback to task which raises exceptions occurring in coroutine
self.responders = {}
self.responder_count = 0
return
async def process(self, response):
loaded_response = json.loads(response)
id = loaded_response["id"]
msg = loaded_response["msg"]
responder = self.responders[id]
await responder.on_response(msg)
return
async def send(self, msg):
future = asyncio.Future()
response = ""
responder = WebSocketResponder()
self.responders[self.responder_count] = responder
self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
self.responder_count += 1 # increment responder count
@responder.on("data")
def data_handler(payload):
nonlocal response, future
print("adding to response: ", payload)
response += payload
response += "\n"
@responder.on("end")
def end_handler():
print("end handler triggered")
nonlocal response, future
print("setting result: ", response)
future.set_result(response)
return future
async def doTheThing():
wsApiUrl = "ws://127.0.0.1:7890"
processor = Processor(wsApiUrl)
result = await processor.send("Hello, Server")
return await result
async def main():
result = await doTheThing()
print("result: ", result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
The code running the websocket server which is responding to the messages is here:
import websockets
import asyncio
import json
# Server data
PORT = 7890
print("Server listening on Port " + str(PORT))
# A set of connected ws clients
connected = set()
# The main behavior function for this server
async def echo(websocket, path):
print("A client just connected")
# Store a copy of the connected client
connected.add(websocket)
# Handle incoming messages
try:
async for message in websocket:
print("Received message from client: " + message)
loaded_message = json.loads(message)
id = loaded_message["id"]
msg_received = loaded_message["msg"]
# Send a response to all connected clients except sender
for conn in connected:
if conn != websocket:
print("responding to another websocket")
await conn.send(json.dumps({"id": id, "msg": f"Someone said: {msg_received}"}))
else:
print("responding to sender")
await conn.send(json.dumps({"id": id, "msg": f"Thanks for your message: {msg_received}"}))
print("sending more")
await conn.send(json.dumps({"id": id, "msg": "Do you get this?"}))
print("sending end")
await conn.send(json.dumps({"id": id, "msg": "end"}))
print("end sent")
# Handle disconnecting clients
except websockets.exceptions.ConnectionClosed as e:
print("A client just disconnected")
finally:
connected.remove(websocket)
# Start the server
start_server = websockets.serve(echo, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
This is a simplified mock up of a much more complex system, the websocket server being a separate service implemented by a third party external to myself.
I create an instance of an event emitter class which is used to react to response from the websocket connection, I then send a message, with the id of this instance in the message.
There is a couroutine / task which runs an async function which should run forever, which receives messages from the websocket and runs an async function to process the message. This process function retrieves the event emitter object and runs the response function on the event emitter instance to process the response. This event emitter emits data events, which trigger a callback instantiated in the send message call, and once the server sends an end signal, the event emitter emits an end event which triggers a callback to set the result of the future created by the send call.
This code runs as expected, except when the end_handler is triggered, it prints the expected messages but the future doesn't resolve, the await call just hangs. I expect the issue is related to the set_result function being run in another thread / event loop, but I have been unable to resolve the issue, despite trying various methods.
The solution was to remove the manual loop assignment and to move the creation of the task inside the WebSocketResponder class, so each responder has it's own task running it's own on_message function, and also to make sure that the on_message function completed and returned at some point once no more messages needed to be received.
The following code works as expected, receiving and aggregating the responses from the server and then, once it receives the end message from the server, it emits an end event and sets the complete flag on the responder object to true. This causes the on_message function to return, which allows the receiving_task to complete. The end_handler then runs and sets the future result, which causes the doTheThing function to return the result from the future.
import asyncio
import websocket
from pyee.base import EventEmitter
import json
class WebSocketResponder(EventEmitter):
def __init__(self, id, websocket, process):
super().__init__()
self.id = id
self.websocket = websocket
self.process = process
self.complete = False
self.receiving_task = asyncio.create_task(self.on_message(), name=f"on_message for responder {self.id}")
self.receiving_task.add_done_callback(self.done_callback) # adds callback to task which raises exceptions occurring in coroutine
return
def done_callback(self, future):
try:
result = future.result()
except Exception as exc:
raise
async def on_message(self):
while not self.complete:
response = self.websocket.recv()
await self.process(response)
return
async def on_response(self, response):
# Notify the callee that a payload has been received.
print("reading response: ", response)
if response != "end":
print("got non end response: ", response)
self.emit("data", response)
else:
print("emitting end")
self.emit("end")
self.complete = True # set complete flag so self.receiving_task may complete
return
class Processor:
def __init__(self, wsApiUrl) -> None:
self.wsApiUrl = wsApiUrl
# websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
self.websocket = websocket.WebSocket()
self.websocket.connect(self.wsApiUrl)
self.responders = {}
self.responder_count = 0
return
async def process(self, response):
loaded_response = json.loads(response)
id = loaded_response["id"]
msg = loaded_response["msg"]
responder = self.responders[id]
await responder.on_response(msg)
return
async def send(self, msg):
response = ""
responder = WebSocketResponder(self.responder_count, self.websocket, self.process)
future = asyncio.Future()
self.responders[self.responder_count] = responder
self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
self.responder_count += 1 # increment responder count
@responder.on("data")
def data_handler(payload):
nonlocal response, future
print("adding to response: ", payload)
response += payload
response += "\n"
@responder.on("end")
def end_handler():
print("end handler triggered")
nonlocal response, future
print("setting result: ", response)
future.set_result(response)
return future
async def doTheThing():
wsApiUrl = "ws://127.0.0.1:7890"
processor = Processor(wsApiUrl)
result = await processor.send("Hello, Server")
return await result
async def main():
result = await doTheThing()
print("result: ", result)
if __name__ == '__main__':
asyncio.run(main())