Search code examples
pythonparallel-processingconcurrent.futures

How to store concurrent.futures ProcessPoolExecutor HTTP responses and process in real time?


I have a project I am working on and I'm looking to use concurrent.futures ProcessPoolExecutor send a high number of HTTP requests. While the code below works great for getting the requests, I'm struggling with ideas to process the information as I get it. I tried inserting it into a sqlite3 database as I get responses, but it became tricky trying to manage locks and avoid the use of global variables.

Ideally, I'd like to start the Pool, and while it is executing, be able to read/store the data. Is this possible or should I take a different route with this...

pool = ProcessPoolExecutor(max_workers=60)
results = list(pool.map(http2_get, urls))

def http2_get(url):
    while(True):
        try:
            start_time = millis()
            result = s.get(url,verify=False)
            print(url + " Total took " + str(millis() - start_time) + " ms")
            return result
        except Exception as e:
            print(e,e.__traceback__.tb_lineno)
            pass

Solution

  • As you noticed, map will not return until all the processes have finished. I assume that you want to process the data in the main process.

    Instead of using map, submit all the tasks and process them as they finish:

    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    pool = ProcessPoolExecutor(max_workers=60)
    futures_list = [pool.submit(http2_get, url) for url in urls]
    
    for future in as_completed(futures_list):
        exception = future.exception()
        if exception is not None:
            # Handle exception in http2_get
            pass
        else:
            result = future.result()
            # process result...
    

    Note that it is cleaner to use the ProcessPoolExecutor as a context manager:

    with ProcessPoolExecutor(max_workers=60) as pool:
        futures_list = [pool.submit(http2_get, url) for url in urls]
        
        for future in as_completed(futures_list):
            exception = future.exception()
            if exception is not None:
                # Handle exception in htt2_get
                pass
            else:
                result = future.result()
                # process result...