Search code examples
pythontornado

Performing long running compute intensive operation on background thread


I have written a REST API in Python Tornado framework which predicts the answer of a question from a given paragraph.


Here is the Python code for the Tornado handler:

def post(self):
    """
    This function predicts the response from the pre-trained Allen model
    """    
    try:
        request_payload = tornado.escape.json_decode(self.request.body)

        if (request_payload is None):
            return self._return_response(self, { "message": "Invalid request!" }, 400)

        context = request_payload["context"]
        question = request_payload["question"]

        if(context is None or not context):
            return self._return_response(self, { "message": "Context is not provided!" }, 400)

        if(question is None or not question):
            return self._return_response(self, { "message": "Question is not provided!" }, 400)

        # Compute intensive operation which blocks the main thread
        answer_prediction = predictor.predict(passage=str(context), question=str(question))
        best_answer = answer_prediction["best_span_str"] or "Sorry, no answer found for your question!"

        return self._return_response(self, { "answer": best_answer }, 200)

    except KeyError:
        #Return bad request if any of the keys are missing
        return self._return_response(self, { "message": 'Some keys are missing from the request!' }, 400)

    except json.decoder.JSONDecodeError:
        return self._return_response(self, { "message": 'Cannot decode request body!' }, 400)

    except Exception as ex:
        return self._return_response(self, { "message": 'Could not complete the request because of some error at the server!', "cause": ex.args[0], "stack_trace": traceback.format_exc(sys.exc_info()) }, 500)

The problem is that the line:

answer_prediction = predictor.predict(passage=str(context), question=str(question))

Blocks the main thread for incoming requests and waits until that long running operation is completed whilst blocking for other requests and times out the current request sometimes.


I have read this answer detailing a solution with putting the long running operation in queue, but I am not getting it.

Also due to Python's GIL only one thread can run at the same time, which forces me to spawn a separate process to deal with it, since processes are costly, are there any viable solution to my problem and how to deal with this kind of situations.

Here are my questions:

  • How to offload safely compute intensive operations to a background thread
  • How to handle timeouts and exceptions gracefully
  • How to maintain a queue structure for checking if the long running operation has been completed or not.

Solution

  • Run the blocking code in a separate thread. Use IOLoop.run_in_executor.

    Example:

    from functools import partial
    
    async def post(self):
        ...
    
        # create a partial object with the keyword arguments
        predict_partial = partial(predictor.predict, passage=str(context), question=str(question))
    
        answer_prediction = await IOLoop.current().run_in_executor(None, predict_partial)
    
        ...