I have the following method concurrent_api_call_and_processing() that takes below parameters:
I do around 500 HTTP requests , one for each id in lst, using api_call() Then each response if processed with the local method callback_processing() that parse the XLM and returns a tuple
def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5):
"""
:param api_call: Function that will be called concurrently. An API call to API_Provider for each entry.
: param lst: List of finding's ids needed by the API function to call API_Provider endpoint.
:param callback_processing: Function that will be called after we get the response from the above API call.
: param workers: Number of concurrent threads that will be used.
:return: array of tuples containing the details of each particular finding.
"""
output = Queue()
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
for future in as_completed(future_to_f_detail):
try:
find_details = future.result()
except Exception as exc:
print(f"Finding {id} generated and exception: {exc}")
else:
f_det = callback_processing(find_details)
output.put(f_det)
return output
I started to note some random issues, (not graceful termination) while using this method.
As I was using an array instead of a queue (output=[]
), but was in doubt if I could have a race condition, I decided to refactor the code and start using a Queue
(output=Queue
)
My question is:
NOTE: I wanted to note that following Raymond Hettinger, Keynote on Concurrency, PyBay 2017, I added fuzz()
sleep methods for testing but could not identify if indeed I had a race condition or not.
Under the above conditions, there won't be a race condition on that code. As per concurrent.futures docs here what happens is this:
So indeed the for loop is consuming the iterator and returning one by one every future that is yield by as_completed()
So unless the call_back() or the function we called introduce some kind of async functionality ( as the example described by @dm03514 above), we are just working synchronously after the for loop
counter = 0
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
for future in as_completed(future_to_f_detail):
print(f"Entering the for loop for {counter+1} time")
counter +=1
try:
find_details = future.result()
except Exception as exc:
print(f"Finding {id} generated and exception: {exc}")
else:
f_det = callback_processing(find_details)
output.append(f_det)
return output
If we have an array of 500 ids and we do 500 calls and all calls yield a future, we will print the message in the print 500 time, once each time before entering the try loop.
We are not forced to use a Queue to avoid a race condition in this case. Futures creates a deferred execution when we use submit we get back a future to be used later
Some important notes and recommendations: