Search code examples
pythonpython-multiprocessingmultiprocessing-manager

Rate Limit Downloads Amongst Multiple Processes


I want to download and process a lot of files from website. The terms of service for the site restrict the number of files you're permitted to download per second.

The time that it takes to process the files is actually the bottle neck, so I'd like to be able process multiple files in parallel. But I don't want the different processes to combine to violate the download limit. So I need something that limits the over request rate. I was thinking something like the following, but I'm not exactly an expert with the multiprocessing module.

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

Then somewhere else running the downloads with

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

This seems to do the job on a small scale, but I'm a little wary about whether the locking is really being done correctly.

Also, if there's a better pattern for achieving the same goal, I'd love to hear it.


Solution

  • The simplest approach is to download on the main thread and feed documents to the worker pool.

    In my own implementations of this I've gone the route of using celery for processing documents and using gevent for downloads. Which does the same thing just with more complexity.

    Here's a simple example.

    import multiprocessing
    from multiprocessing import Pool
    import time
    import typing
    
    def work(doc: str) -> str:
        # do some processing here....
        return doc + " processed"
    
    def download(url: str) -> str:
        return url  # a hack for demo, use e.g. `requests.get()`
    
    def run_pipeline(
        urls: typing.List[str],
        session_request_limit: int = 10,
        session_length: int = 60,
    ) -> None:
        """
        Download and process each url in `urls` at a max. rate limit
        given by `session_request_limit / session_length`
        """
        workers = Pool(multiprocessing.cpu_count())
        results = []
    
        n_requests = 0
        session_start = time.time()
    
        for url in urls:
            doc = download(url)
            results.append(
                workers.apply_async(work, (doc,))
            )
            n_requests += 1
    
            if n_requests >= session_request_limit:
                time_to_next_session = session_length - time.time() - session_start
                time.sleep(time_to_next_session)
    
            if time.time() - session_start >= session_length:
                session_start = time.time()
                n_requests = 0
    
        # Collect results
        for result in results:
            print(result.get())
    
    if __name__ == "__main__":
        urls = ["www.google.com", "www.stackoverflow.com"]
        run_pipeline(urls)