Search code examples
pythonhttpasynchronoustornado

How do I handle streaming data in tornado asynchronously, while handling the response code synchronously?


My tornado API call will be calling another URL, then streaming the result back to the client. However, if the internal URL returns an error code, I want to handle my own error separately, and stream the error content to the client. What I currently have is this:

@web.asynchronous
@gen.coroutine
def get(self, job_id):
    url = ...
    client = httpclient.AsyncHTTPClient()

    # handle_chunk will forward received bytes to the client, allowing
    # other HTTP requests to be handled concurrently
    response = yield client.fetch(httpclient.HTTPRequest(
        url=url,
        streaming_callback=self._handle_chunk))
    self.finish()

def _handle_chunk(self, chunk):
    self.write(chunk)
    self.flush()

I need to modify this to only start forwarding chunks if the response code is in the 200 family, but the response isn't yielded by client.fetch until the whole request is complete. Any idea how to do this?


Solution

  • Use both streaming_callback and header_callback. The header callback will be run before the first call to streaming_callback, with the headers as a list of strings. The headers start with the status line which you'll have to parse yourself:

    @gen.coroutine
    def get(self, job_id):
        url = ...
        client = httpclient.AsyncHTTPClient()
    
        response = yield client.fetch(httpclient.HTTPRequest(
            url=url,
            header_callback=self._handle_headers,
            streaming_callback=self._handle_chunk))
        if response.code != 200:
            # handle error
        self.finish()
    
    def _handle_headers(self, headers):
        self._start_line = tornado.httputil.parse_response_start_line(headers[0])
    
    def _handle_chunk(self, chunk):
        if self._start_line.code == 200:
            self.write(chunk)
            self.flush()
    

    It's not currently possible to cleanly cancel a request; if the status code is not 200, your callback will have to accept the streaming chunks and ignore them (and then you return an error elsewhere).