Search code examples
pythonmultithreadingasynchronoustornado

Call to async endpoint gets blocked by another thread


I have a tornado webservice which is going to serve something around 500 requests per minute. All these requests are going to hit 1 specific endpoint. There is a C++ program that I have compiled using Cython and use it inside the tornado service as my processor engine. Each request that goes to /check/ will trigger a function call in the C++ program (I will call it handler) and the return value will get sent to user as response.

This is how I wrap the handler class. One important point is that I do not instantiate the handler in __init__. There is another route in my tornado code that I want to start loading the DataStructure after an authroized request hits that route. (e.g. /reload/)


executors = ThreadPoolExecutor(max_workers=4)


class CheckerInstance(object):
    def __init__(self, *args, **kwargs):
        self.handler = None
        self.is_loading = False
        self.is_live = False

    def init(self):
        if not self.handler:
            self.handler = pDataStructureHandler()
            self.handler.add_words_from_file(self.data_file_name)
            self.end_loading()
            self.go_live()

    def renew(self):
        self.handler = None
        self.init()



class CheckHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument("q", None).encode('utf-8')
        answer = query

        if not checker_instance.is_live:
            self.write(dict(answer=self.get_argument("q", None), confidence=100))
            return

        checker_response = await checker_instance.get_response(query)
        answer = checker_response[0]
        confidence = checker_response[1]

        if self.request.connection.stream.closed():
            return
        self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))

    def on_connection_close(self):
        self.wait_future.cancel()


class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
    def prepare(self):
        self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')

    def new_file_exists(self):
        return True

    def can_reload(self):
        return not checker_instance.is_loading

    def get(self):
        error = False
        message = None

        if not self.can_reload():
            error = True
            message = 'another job is being processed!'
        else:
            if not self.new_file_exists():
                    error = True
                    message = 'no new file found!'
            else:
                checker_instance.go_fake()
                checker_instance.start_loading()
                tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
                message = 'job started!'

        if self.request.connection.stream.closed():
            return
        self.write(dict(
            success=not error, message=message
        ))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/check", CheckHandler),
            (r"/reload", InstanceReloadHandler),
            (r"/health", HealthHandler),
            (r"/log-event", SubmitLogHandler),
        ],
        debug=options.debug,
    )
    checker_instance = CheckerInstance()

I want this service to keep responding after checker_instance.renew starts running in another thread. But this is not what happens. When I hit the /reload/ endpoint and renew function starts working, any request to /check/ halts and waits for the reloading process to finish and then it starts working again. When the DataStructure is being loaded, the service should be in fake mode and respond to people with the same query that they send as input.

I have tested this code in my development environment with an i5 CPU (4 CPU cores) and it works just fine! But in the production environment (3 double-thread CPU cores) the /check/ endpoint halts requests.


Solution

  • It is difficult to fully trace the events being handled because you have clipped out some of the code for brevity. For instance, I don't see a get_response implementation here so I don't know if it is awaiting something itself that could be dependent on the state of checker_instance.

    One area I would explore is in the thread-safety (or seeming absence of) in passing the checker_instance.renew to run_in_executor. This feels questionable to me because you are mutating the state of a single instance of CheckerInstance from a separate thread. While it might not break things explicitly, it does seem like this could be introducing odd race conditions or unanticipated copies of memory that might explain the unexpected behavior you are experiencing

    If possible, I would make whatever load behavior you have that you want to offload to a thread be completely self-contained and when the data is loaded, return it as the function result which can then be fed back into you checker_instance. If you were to do this with the code as-is, you would want to await the run_in_executor call for its result and then update the checker_instance. This would mean the reload GET request would wait until the data was loaded. Alternatively, in your reload GET request, you could ioloop.spawn_callback to a function that triggers the run_in_executor in this manner, allowing the reload request to complete instead of waiting.