i am currently working on an supposedly easy web scraping project while learning python. I have a list of about 70MB with a few million IP addresses (sys.argv[1]) that i want to process. Of course, not all of them are reachable.
I am trying to make use of the the concurrent.futures and am currently experiencing memory problems - eventually leading to the whole process being killed.
Now, i have split my futures in two sets (done and not done) as suggested here. I am using about 100 workers (sys.argv[2]) and have 1GB memory available.
I though all done futures would be released once future.results() is called with => futures 1000 done? However, it just seems to be slowing down the process (including the memory being filled until the process is killed).
What am i missing here? Any suggestions on how to handle this?
Thank you in advance.
My code is as follows:
import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host):
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
print(host+": "+title)
except:
pass
max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
futures_done = set()
futures_notdone = set()
with open(sys.argv[1]) as f:
for line in f:
host = line.strip()
futures_notdone.add(executor.submit(title, host))
if len(futures_notdone) >= max:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
for future in futures_done:
if len(futures_done) >= 1000:
future.result()
Look like you are storing done futures in a set without clearing this list later, so it can grow very large. This could be the cause of your memory problem. The .release()
method of future does not release it, and it is still referenced in the done_future
list.
Not perfect, but you can try the following. It schedules at most max
job to be executed concurrently. It periodically gather done jobs and reschedules new jobs. The idea comes from this blog.
The drawback I see in this method is that it must periodically poll the max
scheduled jobs to find the ones that are done, this could be slow with large max
values.
import sys
import requests
import concurrent.futures
import urllib3
from itertools import islice
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host: str) -> str:
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return host+": "+title
except:
pass
max = int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
with open(sys.argv[1]) as f:
futures = {executor.submit(title, h) for h in islice(f, max)}
while futures:
done, futures = concurrent.futures.wait(
futures, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
print(future.result())
for h in islice(f, len(done)):
futures.add(executor.submit(title, h))
Here is a workaround that may work for you, It ran for more than 1 million iteration without using more than 150 Mo on my computer.
It is simply a custom thread pool with two queues to manage concurrent ressources access and to limit the maximum concurrency.
import sys
from typing import Optional
import requests
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from queue import Queue
from threading import Thread
def get_title(host: str) -> Optional[str]:
try:
url = f"https://{host}"
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=1, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return f"{host}: {title}"
except Exception:
return None
class Pool:
def __init__(self, work, max_concurrent_jobs, max_worker: int = 32) -> None:
self.max_workers = max_worker
self.work_queue = Queue(max_concurrent_jobs)
self.out_queue = Queue()
self.is_running = True
def _work():
while self.is_running:
item = self.work_queue.get()
result = work(item)
self.work_queue.task_done()
self.out_queue.put(result)
for _ in range(max_worker):
Thread(target=_work).start()
def close(self):
self.is_running = False
if __name__ == "__main__":
file_name = sys.argv[1]
max = int(sys.argv[2])
pool = Pool(work=get_title, max_concurrent_jobs=max)
def worker():
while True:
item = pool.out_queue.get()
if item is not None:
print(item) # Or any follow-up job
pool.out_queue.task_done()
Thread(target=worker, daemon=True).start()
with open(file_name) as f:
for h in f:
pool.work_queue.put(h.strip())