Search code examples
pythonasynchronoustornado

tornado: AsyncHttpClient.fetch from an iterator?


I'm trying to write a web crawler thing and want to make HTTP requests as quickly as possible. tornado's AsyncHttpClient seems like a good choice, but all the example code I've seen (e.g. https://stackoverflow.com/a/25549675/1650177) basically call AsyncHttpClient.fetch on a huge list of URLs to let tornado queue them up and eventually make the requests.

But what if I want to process an indefinitely long (or just a really big) list of URLs from a file or the network? I don't want to load all the URLs into memory.

Googled around but can't seem to find a way to AsyncHttpClient.fetch from an iterator. I did however find a way to do what I want using gevent: http://gevent.org/gevent.threadpool.html#gevent.threadpool.ThreadPool.imap. Is there a way to do something similar in tornado?

One solution I've thought of is to only queue up so many URLs initially then add logic to queue up more when a fetch operation completes but I'm hoping there's a cleaner solution.

Any help or recommendations would be appreciated!


Solution

  • I would do this with a Queue and multiple workers, in a variation on https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py

    import tornado.queues
    from tornado import gen
    from tornado.httpclient import AsyncHTTPClient
    from tornado.ioloop import IOLoop
    
    NUM_WORKERS = 10
    QUEUE_SIZE = 100
    q = tornado.queues.Queue(QUEUE_SIZE)
    AsyncHTTPClient.configure(None, max_clients=NUM_WORKERS)
    http_client = AsyncHTTPClient()
    
    @gen.coroutine
    def worker():
        while True:
            url = yield q.get()
            try:
                response = yield http_client.fetch(url)
                print('got response from', url)
            except Exception:
                print('failed to fetch', url)
            finally:
                q.task_done()
    
    @gen.coroutine
    def main():
        for i in range(NUM_WORKERS):
            IOLoop.current().spawn_callback(worker)
        with open("urls.txt") as f:
            for line in f:
                url = line.strip()
                # When the queue fills up, stop here to wait instead
                # of reading more from the file.
                yield q.put(url)
        yield q.join()
    
    if __name__ == '__main__':
        IOLoop.current().run_sync(main)