Search code examples
pythoneventswebsocketpython-asynciofuture

awaiting a future which never resolves: set_result called by a callback triggered by an event emitter from a background coroutine / task


I have some code for communicating with a websocket connection. I want to send a message to the server and returns a future which will resolve with the data sent by the server once the server sends an end signal. The code collecting the responses should run in the background, in a non-blocking way, collecting the responses until an end signal is sent, which should trigger the future to resolve. Multiple of these should be able to be running concurrently.

The code I am using to do this is below:

import asyncio
import websocket
from pyee.base import EventEmitter
import json

def done_callback(future):
    try:
        result = future.result()
    except Exception as exc:
        raise

class WebSocketResponder(EventEmitter):
    def __init__(self):
        super().__init__()
        return

    async def on_response(self, response):
        # Notify the callee that a payload has been received.
        print("reading response: ", response)
        if response != "end":
            print("got non end response: ", response)
            self.emit("data", response)
        else:
            print("emitting end")
            self.emit("end")
        return

class Processor:
    def __init__(self, wsApiUrl) -> None:
        self.wsApiUrl = wsApiUrl
        # websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
        self.websocket = websocket.WebSocket()
        self.websocket.connect(self.wsApiUrl)
        async def on_message():
            while True:
                response = self.websocket.recv()
                await self.process(response)

        loop = asyncio.get_running_loop()
        self.receiving_coroutine = asyncio.run_coroutine_threadsafe(on_message(), loop)
        self.receiving_coroutine.add_done_callback(done_callback) # adds callback to task which raises exceptions occurring in coroutine
        self.responders = {}
        self.responder_count = 0
        return
    
    async def process(self, response):
        loaded_response = json.loads(response)
        id = loaded_response["id"]
        msg = loaded_response["msg"]
        responder = self.responders[id]
        await responder.on_response(msg)
        return

    async def send(self, msg):
        future = asyncio.Future()
        response = ""

        responder = WebSocketResponder()
        self.responders[self.responder_count] = responder

        self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
        
        self.responder_count += 1 # increment responder count
        @responder.on("data")
        def data_handler(payload):
            nonlocal response, future
            print("adding to response: ", payload)
            response += payload
            response += "\n"

        @responder.on("end")
        def end_handler():
            print("end handler triggered")
            nonlocal response, future
            print("setting result: ", response)
            future.set_result(response)

        return future

async def doTheThing():
    wsApiUrl = "ws://127.0.0.1:7890"

    processor = Processor(wsApiUrl)

    result = await processor.send("Hello, Server")
    return await result

async def main():
    result = await doTheThing()
    print("result: ", result)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The code running the websocket server which is responding to the messages is here:

import websockets
import asyncio
import json

# Server data
PORT = 7890
print("Server listening on Port " + str(PORT))

# A set of connected ws clients
connected = set()

# The main behavior function for this server
async def echo(websocket, path):
    print("A client just connected")
    # Store a copy of the connected client
    connected.add(websocket)
    # Handle incoming messages
    try:
        async for message in websocket:
            print("Received message from client: " + message)
            loaded_message = json.loads(message)
            id = loaded_message["id"]
            msg_received = loaded_message["msg"]
            # Send a response to all connected clients except sender
            for conn in connected:
                if conn != websocket:
                    print("responding to another websocket")
                    await conn.send(json.dumps({"id": id, "msg": f"Someone said: {msg_received}"}))
                else:
                    print("responding to sender")
                    await conn.send(json.dumps({"id": id, "msg": f"Thanks for your message: {msg_received}"}))
                    print("sending more")
                    await conn.send(json.dumps({"id": id, "msg": "Do you get this?"}))
                    print("sending end")
                    await conn.send(json.dumps({"id": id, "msg": "end"}))
                    print("end sent")
    # Handle disconnecting clients 
    except websockets.exceptions.ConnectionClosed as e:
        print("A client just disconnected")
    finally:
        connected.remove(websocket)

# Start the server
start_server = websockets.serve(echo, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

This is a simplified mock up of a much more complex system, the websocket server being a separate service implemented by a third party external to myself.

I create an instance of an event emitter class which is used to react to response from the websocket connection, I then send a message, with the id of this instance in the message.

There is a couroutine / task which runs an async function which should run forever, which receives messages from the websocket and runs an async function to process the message. This process function retrieves the event emitter object and runs the response function on the event emitter instance to process the response. This event emitter emits data events, which trigger a callback instantiated in the send message call, and once the server sends an end signal, the event emitter emits an end event which triggers a callback to set the result of the future created by the send call.

This code runs as expected, except when the end_handler is triggered, it prints the expected messages but the future doesn't resolve, the await call just hangs. I expect the issue is related to the set_result function being run in another thread / event loop, but I have been unable to resolve the issue, despite trying various methods.


Solution

  • The solution was to remove the manual loop assignment and to move the creation of the task inside the WebSocketResponder class, so each responder has it's own task running it's own on_message function, and also to make sure that the on_message function completed and returned at some point once no more messages needed to be received.

    The following code works as expected, receiving and aggregating the responses from the server and then, once it receives the end message from the server, it emits an end event and sets the complete flag on the responder object to true. This causes the on_message function to return, which allows the receiving_task to complete. The end_handler then runs and sets the future result, which causes the doTheThing function to return the result from the future.

    import asyncio
    import websocket
    from pyee.base import EventEmitter
    import json
    
    
    
    class WebSocketResponder(EventEmitter):
        def __init__(self, id, websocket, process):
            super().__init__()
    
            self.id = id
            self.websocket = websocket
            self.process = process
            self.complete = False
            self.receiving_task = asyncio.create_task(self.on_message(), name=f"on_message for responder {self.id}")
            self.receiving_task.add_done_callback(self.done_callback) # adds callback to task which raises exceptions occurring in coroutine
            return
        
        def done_callback(self, future):
            try:
                result = future.result()
            except Exception as exc:
                raise
    
        async def on_message(self):
            while not self.complete:
                response = self.websocket.recv()
                await self.process(response)
            return
    
        async def on_response(self, response):
            # Notify the callee that a payload has been received.
            print("reading response: ", response)
            if response != "end":
                print("got non end response: ", response)
                self.emit("data", response)
            else:
                print("emitting end")
                self.emit("end")
                self.complete = True # set complete flag so self.receiving_task may complete
            return
    
    class Processor:
        def __init__(self, wsApiUrl) -> None:
            self.wsApiUrl = wsApiUrl
            # websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
            self.websocket = websocket.WebSocket()
            self.websocket.connect(self.wsApiUrl)
            self.responders = {}
            self.responder_count = 0
            return
        
        async def process(self, response):
            loaded_response = json.loads(response)
            id = loaded_response["id"]
            msg = loaded_response["msg"]
            responder = self.responders[id]
            await responder.on_response(msg)
            return
    
        async def send(self, msg):
            response = ""
    
            responder = WebSocketResponder(self.responder_count, self.websocket, self.process)
            future = asyncio.Future()
            self.responders[self.responder_count] = responder
    
            self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
            
            self.responder_count += 1 # increment responder count
            @responder.on("data")
            def data_handler(payload):
                nonlocal response, future
                print("adding to response: ", payload)
                response += payload
                response += "\n"
    
            @responder.on("end")
            def end_handler():
                print("end handler triggered")
                nonlocal response, future
                print("setting result: ", response)
                future.set_result(response)
    
            return future
    
    async def doTheThing():
        wsApiUrl = "ws://127.0.0.1:7890"
    
        processor = Processor(wsApiUrl)
    
        result = await processor.send("Hello, Server")
        return await result
    
    async def main():
        result = await doTheThing()
        print("result: ", result)
    
    
    if __name__ ==  '__main__':
        asyncio.run(main())