Search code examples
pythonasynchronouspython-requeststwisted

Asynchronous download of files with twisted and (tx)requests


I'm trying to download file(s) from the internet from within a twisted application. I'd like to do this using requests due to the other features it provides directly or has well maintained libraries to provide (retries, proxies, cachecontrol, etc.). I am open to a twisted only solution which does not have these features, but I can't seem to find one anyway.

The files should be expected to be fairly large and will be downloaded on slow connections. I'm therefore using requests' stream=True interface and the response's iter_content. A more or less complete code fragment is listed at the end of this question. The entry point for this would be http_download function, called with a url, a dst to write the file to, and a callback and an optional errback to handle a failed download. I've stripped away some of the code involved in preparing the destination (create folders, etc) and code to close the session during reactor exit but I think it should still work as is.

This code works. The file is downloaded, the twisted reactor continues to operate. However, I seem to have a problem with this bit of code :

def _stream_download(r, f):
    for chunk in r.iter_content(chunk_size=128):
        f.write(chunk)
        yield None

cooperative_dl = cooperate(_stream_download(response, filehandle))

Because iter_content returns only when it has a chunk to return, the reactor handles a chunk, runs other bits of code, then returns to waiting for the next chunk instead of keeping itself busy updating a spinning wait animation on the GUI (code not actually posted here).

Here's the question -

  • Is there a way to get twisted to operate on this generator in such a way that it yields control when the generator itself is not prepared to yield something? I came across some docs for twisted.flow which seemed appropriate, but this does not seem to have made it into twisted or no longer exists today. This question can be read independent of the specifics, i.e., with respect to any arbitrary blocking generator, or can be read in the immediate context of the question.
  • Is there a way to get twisted to download files asynchronously using something full-featured like requests? Is there an existing twisted module which just does this which I can just use?
  • What would the basic approach be to such a problem with twisted, independent of the http features I want to use from requests. Let's assume I'm prepared to ditch them or otherwise implement them. How would I download a file asynchronously over HTTP.
import os
import re
from functools import partial
from six.moves.urllib.parse import urlparse

from requests import HTTPError
from twisted.internet.task import cooperate
from txrequests import Session

class HttpClientMixin(object):
    def __init__(self, *args, **kwargs):
        self._http_session = None

    def http_download(self, url, dst, callback, errback=None, **kwargs):
        dst = os.path.abspath(dst)
        # Log request
        deferred_response = self.http_session.get(url, stream=True, **kwargs)
        deferred_response.addCallback(self._http_check_response)
        deferred_response.addCallbacks(
            partial(self._http_download, destination=dst, callback=callback),
            partial(self._http_error_handler, url=url, errback=errback)
        )

    def _http_download(self, response, destination=None, callback=None):
        def _stream_download(r, f):
            for chunk in r.iter_content(chunk_size=128):
                f.write(chunk)
                yield None

        def _rollback(r, f, d):
            if r:
                r.close()
            if f:
                f.close()
            if os.path.exists(d):
                os.remove(d)

        filehandle = open(destination, 'wb')
        cooperative_dl = cooperate(_stream_download(response, filehandle))
        cooperative_dl.whenDone().addCallback(lambda _: response.close)
        cooperative_dl.whenDone().addCallback(lambda _: filehandle.close)
        cooperative_dl.whenDone().addCallback(
            partial(callback, url=response.url, destination=destination)
        )
        cooperative_dl.whenDone().addErrback(
            partial(_rollback, r=response, f=filehandle, d=destination)
        )

    def _http_error_handler(self, failure, url=None, errback=None):
        failure.trap(HTTPError)
        # Log error message
        if errback:
            errback(failure)

    @staticmethod
    def _http_check_response(response):
        response.raise_for_status()
        return response

    @property
    def http_session(self):
        if not self._http_session:
            # Log session start
            self._http_session = Session()
        return self._http_session

Solution

  • Is there a way to get twisted to operate on this generator in such a way that it yields control when the generator itself is not prepared to yield something?

    No. All Twisted can do is invoke the code. If the code blocks indefinitely, then the calling thread is blocked indefinitely. This is a basic premise of the Python runtime.

    Is there a way to get twisted to download files asynchronously using something full-featured like requests?

    There's treq. You didn't say what "full-featured" means here but earlier you mentioned "retries", "proxies", and "cachecontrol". I don't believe treq currently has these features. You can find some kind of feature matrix in the treq docs (though I notice it doesn't include any of the features you mentioned - even for requests). I expect implementations of such features would be welcome as treq contributions.

    Is there a way to get twisted to download files asynchronously using something full-featured like requests?

    Run it in a thread - probably using Twisted's threadpool APIs.

    What would the basic approach be to such a problem with twisted, independent of the http features I want to use from requests.

    treq.