Search code examples
pythontwistedtwisted.webtwisted.internet

Issues when running http server with deferred threads under high load in twisted


I am having some issues related to the twisted http framework. Specifically, I am trying to implement the usage of threads.deferToThread with callbacks, to allow for better concurrency, which works fine until you have a higher load of traffic trying to access the server at once. In that case, the callbacks often fail to execute at all, and nothing is being written to the transport.

To put it short, on each request, the server will process packets sent by the client inside a deferred thread. The server then writes response packets to a queue, which the on_request_done callback then dequeues and returns as a response. As previously mentioned, this works perfectly fine with 1 or 2 clients.

Here is a simplified version of the code I am working with:

class HttpPlayer(Player):
    def __init__(self, address: str, port: int) -> None:
        super().__init__(address, port)
        self.queue = Queue()
        self.token = ""

    def enqueue(self, data: bytes):
        self.queue.put(data)

    def dequeue(self, max: int = 4096) -> bytes:
        data = b""

        while not self.queue.empty():
            data += self.queue.get()

        return data

class HttpBanchoProtocol(Resource):
    isLeaf = True

    def __init__(self) -> None:
        self.player: HttpPlayer | None = None
        self.children = {}

    def handle_login_request(self, request: Request) -> bytes:
        username, password, client = (
            request.content.read().decode().splitlines()
        )

        deferred = threads.deferToThread(
            self.player.login_received,
            username,
            password,
            client
        )

        deferred.addCallbacks(
            lambda _: self.on_request_done(request),
            lambda f: self.on_request_error(request, f)
        )

        return NOT_DONE_YET

    def handle_request(self, request: Request) -> bytes:
        deferred = threads.deferToThread(
            self.process_packets,
            request.content.read()
        )

        deferred.addCallbacks(
            lambda _: self.on_request_done(request),
            lambda f: self.on_request_error(request, f)
        )

        return NOT_DONE_YET

    def login_received(self, username: str, password: str, client: str) -> None:
        # Processing login here
        ...

    def process_packets(self, request: bytes):
        # Processing packets here
        ...

    def on_request_done(self, request: Request) -> None:
        if request._disconnected:
            self.player.logger.warning('Client disconnected before response')
            return

        if request.finished:
            self.player.logger.warning('Request finished before response')
            return

        request.write(self.player.dequeue())
        request.finish()

    def on_request_error(
        self,
        request: Request,
        failiure: Failure
    ) -> None:
        request.setResponseCode(500)
        self.player.send_error()
        self.player.logger.error(
            f'Failed to process request: {failiure.getErrorMessage()}',
            exc_info=failiure.value
        )
        self.on_login_done(request)

    def render_POST(self, request: Request) -> bytes:
        request.setResponseCode(200)

        if not (token := request.getHeader('token')):
            return self.handle_login_request(request)

        if not (player := app.session.players.by_token(token)):
            request.setResponseCode(401)
            return b""

        self.player = player
        return self.handle_request(request)

Here is the code I am working on for more reference if needed: https://github.com/osuTitanic/anchor/blob/044a9b75dee8d36816bbb5d7af67dc8902f2d34e/app/http.py#L99


Solution

  • The solution here was to remove the player attribute from the class, ensuring that each request operated with its own instance of the player. This change improved thread safety and allowed the server to handle multiple requests concurrently without issues, resulting in correct responses being sent back to clients.