Search code examples
pythonpython-3.xmultithreadingpython-multithreadingconcurrent.futures

concurrent.futures.ThreadPoolExecutor / Multithreading runs out of memory (Killed)


i am currently working on an supposedly easy web scraping project while learning python. I have a list of about 70MB with a few million IP addresses (sys.argv[1]) that i want to process. Of course, not all of them are reachable.

I am trying to make use of the the concurrent.futures and am currently experiencing memory problems - eventually leading to the whole process being killed.

Now, i have split my futures in two sets (done and not done) as suggested here. I am using about 100 workers (sys.argv[2]) and have 1GB memory available.

I though all done futures would be released once future.results() is called with => futures 1000 done? However, it just seems to be slowing down the process (including the memory being filled until the process is killed).

What am i missing here? Any suggestions on how to handle this?

Thank you in advance.

My code is as follows:

import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def title(host):
    try:
        url="https://"+host
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        print(host+": "+title)
    except:
        pass

max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
    futures_done = set()
    futures_notdone = set()
    with open(sys.argv[1]) as f:
        for line in f:
            host = line.strip()
            futures_notdone.add(executor.submit(title, host))
            if len(futures_notdone) >= max:
                done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
                futures_done.update(done)
            for future in futures_done:
                if len(futures_done) >= 1000:
                    future.result()

Solution

  • Look like you are storing done futures in a set without clearing this list later, so it can grow very large. This could be the cause of your memory problem. The .release() method of future does not release it, and it is still referenced in the done_future list.

    Not perfect, but you can try the following. It schedules at most max job to be executed concurrently. It periodically gather done jobs and reschedules new jobs. The idea comes from this blog.

    The drawback I see in this method is that it must periodically poll the max scheduled jobs to find the ones that are done, this could be slow with large max values.

    import sys
    import requests
    import concurrent.futures
    import urllib3
    from itertools import islice
    from lxml.html import fromstring
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    
    def title(host: str) -> str:
        try:
            url="https://"+host
            r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
            tree = fromstring(r.content.decode('utf-8'))
            title = tree.findtext('.//title')
            return host+": "+title
        except:
            pass
    
    max = int(sys.argv[2])
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
        with open(sys.argv[1]) as f:
            futures = {executor.submit(title, h) for h in islice(f, max)}
            
            while futures:
                done, futures = concurrent.futures.wait(
                    futures, return_when=concurrent.futures.FIRST_COMPLETED)
    
                for future in done:
                    print(future.result())
    
                for h in islice(f, len(done)):
                    futures.add(executor.submit(title, h))
    

    Here is a workaround that may work for you, It ran for more than 1 million iteration without using more than 150 Mo on my computer.

    It is simply a custom thread pool with two queues to manage concurrent ressources access and to limit the maximum concurrency.

    import sys
    from typing import Optional
    import requests
    import urllib3
    from lxml.html import fromstring
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    from queue import Queue
    from threading import Thread
    
    
    def get_title(host: str) -> Optional[str]:
        try:
            url = f"https://{host}"
            r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=1, verify=False)
            tree = fromstring(r.content.decode('utf-8'))
            title = tree.findtext('.//title')
            return f"{host}: {title}"
        except Exception:
            return None
    
    class Pool:
        def __init__(self, work, max_concurrent_jobs, max_worker: int = 32) -> None:
            self.max_workers = max_worker
            self.work_queue = Queue(max_concurrent_jobs)
            self.out_queue = Queue()
            self.is_running = True
    
            def _work():
                while self.is_running:
                    item = self.work_queue.get()
                    result = work(item)
                    self.work_queue.task_done()
                    self.out_queue.put(result)
    
            for _ in range(max_worker):
                Thread(target=_work).start()
    
        def close(self):
            self.is_running = False
    
    
    if __name__ == "__main__":
        file_name = sys.argv[1]
        max = int(sys.argv[2])
        pool = Pool(work=get_title, max_concurrent_jobs=max)
    
        def worker():
            while True:
                item = pool.out_queue.get()
                if item is not None:
                    print(item) # Or any follow-up job
                pool.out_queue.task_done()
    
        Thread(target=worker, daemon=True).start()
    
        with open(file_name) as f:
            for h in f:
                pool.work_queue.put(h.strip())