Search code examples
python-2.7httpflaskrabbitmqpolling

Simulate a synchronous request on top of background async job with Flask


I'll first explain the architecture of my system and then move to the question:

I have a REST API which is used as my API gateway. This server is build using Flask. I also have RabbitMQ cluster, and a client I wrote that listens to a specific queue and executes the tasks its getting.

Until now, all of my requests were asynchronous, so once a request has reached to the API gateway, a callback_uri field with URL to POST the results to provided as part of the request, and the API gateway was just responsible for sending the task to RabbitMQ and the worker processed the task, and at the end POSTed the results back to the callback URL.

My question is:

I want a new endpoint to be synchronous in the sense of, that the processing will be done still by the same worker as before, but I want to get the results back to the API gateway to return to the user and release the connection.

My current solution:

I'm sending a unique callback_uri as part of the request to the worker as before, but now the specific endpoint is implemented by my API gateway and allow both POST and GET methods, so the worker can POST the results once it finished, and my API gateway keeps polling the callback URL until a result is available and then return the result to the client.

Is there any other preferred option other than having a busy-waiting HTTP worker polling its own endpoint to get the results? but still be synchronous so the connection released only when the results become available?

Code for illustration only:

@app.route('/long_task', methods=['POST'])
@sync_request
def long_task():
    try:
        if request.get_json() is None:
            return ERROR_MSG_NO_JSON, 400
        create_and_send_request_to_rabbitmq()
        return '', 200
    except Exception as ex:
        return ERROR_MSG_NO_DATA, 400


def sync_request(func):

    def call(*args, **kwargs):
        create_callback_uri()
        result = func(*args, **kwargs)
        status_code = result[1]
        if status_code == 200:
            result = get_callback_result()
        return result

    return call

def get_callback_result():
    callback_uri = request.get_json()['callback_uri']
    has_answer = False
    headers = {'content-type': 'application/json'}
    empty_response = {}
    content = json.dumps(empty_response)

    try:
        with Timeout(seconds=SYNC_REQUEST_TIMEOUT_SECONDS):
            while not has_answer:
                response = requests.get(callback_uri, headers=headers)
                if response.status_code == 200:
                    has_answer = True
                    content = response.content
                else:
                    time.sleep(0.2)
    except TimeoutException:
        log.debug('Timed out on sync request for request %s ' % request)

    return content, 200

Solution

  • So if I understand you correctly you want your backend to wait for the response from some worker (via RabbitMQ). You can achieve that by implementing rpc over rabbitmq. The key idea is to use the correlation id.

    But definitely the most efficient way would be to run the client over websockets (or raw tcp socket if it is not a browser) and notify him directly when the job is done. That way you don't lock resources (client connection, rabbitmq queues) and you avoid performance hit (rpc).