I have a project I am working on and I'm looking to use concurrent.futures ProcessPoolExecutor send a high number of HTTP requests. While the code below works great for getting the requests, I'm struggling with ideas to process the information as I get it. I tried inserting it into a sqlite3 database as I get responses, but it became tricky trying to manage locks and avoid the use of global variables.
Ideally, I'd like to start the Pool, and while it is executing, be able to read/store the data. Is this possible or should I take a different route with this...
pool = ProcessPoolExecutor(max_workers=60)
results = list(pool.map(http2_get, urls))
def http2_get(url):
while(True):
try:
start_time = millis()
result = s.get(url,verify=False)
print(url + " Total took " + str(millis() - start_time) + " ms")
return result
except Exception as e:
print(e,e.__traceback__.tb_lineno)
pass
As you noticed, map
will not return until all the processes have finished. I assume that you want to process the data in the main process.
Instead of using map
, submit all the tasks and process them as they finish:
from concurrent.futures import ProcessPoolExecutor, as_completed
pool = ProcessPoolExecutor(max_workers=60)
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in http2_get
pass
else:
result = future.result()
# process result...
Note that it is cleaner to use the ProcessPoolExecutor
as a context manager:
with ProcessPoolExecutor(max_workers=60) as pool:
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in htt2_get
pass
else:
result = future.result()
# process result...