I want to download and process a lot of files from website. The terms of service for the site restrict the number of files you're permitted to download per second.
The time that it takes to process the files is actually the bottle neck, so I'd like to be able process multiple files in parallel. But I don't want the different processes to combine to violate the download limit. So I need something that limits the over request rate. I was thinking something like the following, but I'm not exactly an expert with the multiprocessing
module.
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
Then somewhere else running the downloads with
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
This seems to do the job on a small scale, but I'm a little wary about whether the locking is really being done correctly.
Also, if there's a better pattern for achieving the same goal, I'd love to hear it.
The simplest approach is to download on the main thread and feed documents to the worker pool.
In my own implementations of this I've gone the route of using celery for processing documents and using gevent for downloads. Which does the same thing just with more complexity.
Here's a simple example.
import multiprocessing
from multiprocessing import Pool
import time
import typing
def work(doc: str) -> str:
# do some processing here....
return doc + " processed"
def download(url: str) -> str:
return url # a hack for demo, use e.g. `requests.get()`
def run_pipeline(
urls: typing.List[str],
session_request_limit: int = 10,
session_length: int = 60,
) -> None:
"""
Download and process each url in `urls` at a max. rate limit
given by `session_request_limit / session_length`
"""
workers = Pool(multiprocessing.cpu_count())
results = []
n_requests = 0
session_start = time.time()
for url in urls:
doc = download(url)
results.append(
workers.apply_async(work, (doc,))
)
n_requests += 1
if n_requests >= session_request_limit:
time_to_next_session = session_length - time.time() - session_start
time.sleep(time_to_next_session)
if time.time() - session_start >= session_length:
session_start = time.time()
n_requests = 0
# Collect results
for result in results:
print(result.get())
if __name__ == "__main__":
urls = ["www.google.com", "www.stackoverflow.com"]
run_pipeline(urls)