Search code examples
pythonpython-3.xmultithreadingpython-multithreading

Avoiding race condition while using ThreadPoolExecutor


I have the following method concurrent_api_call_and_processing() that takes below parameters:

  • api_call: is an HTTP request to an external WebSite that retrieve and XLM document
  • lst: is a list of integers(ids) needed by api_call
  • callback_processing: Is a local method that just parses each XLM request

I do around 500 HTTP requests , one for each id in lst, using api_call() Then each response if processed with the local method callback_processing() that parse the XLM and returns a tuple

def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5):
    """
    :param api_call: Function that will be called concurrently. An API call to API_Provider for each entry.
    : param lst: List of finding's ids needed by the API function to call API_Provider endpoint.
    :param callback_processing: Function that will be called after we get the response from the above  API call.
    : param workers: Number of concurrent threads that will be used.
    :return: array of tuples containing the details of each particular finding.
    """

    output = Queue()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.put(f_det)
    return output

I started to note some random issues, (not graceful termination) while using this method.

As I was using an array instead of a queue (output=[]), but was in doubt if I could have a race condition, I decided to refactor the code and start using a Queue (output=Queue)

My question is:

  • Is my code, as it is now, free of race condition?

NOTE: I wanted to note that following Raymond Hettinger, Keynote on Concurrency, PyBay 2017, I added fuzz() sleep methods for testing but could not identify if indeed I had a race condition or not.


Solution

  • Under the above conditions, there won't be a race condition on that code. As per concurrent.futures docs here what happens is this:

    1. executor.submit(): Returns a Future object representing the execution of the callable.
    2. as_completed(future_to_f_detail): Returns an iterator over the Future instances given by future_to_f_detail that yields futures as they complete (finished or canceled futures).

    So indeed the for loop is consuming the iterator and returning one by one every future that is yield by as_completed()

    So unless the call_back() or the function we called introduce some kind of async functionality ( as the example described by @dm03514 above), we are just working synchronously after the for loop

       counter = 0
       with ThreadPoolExecutor(max_workers=workers) as executor:
            future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
            for future in as_completed(future_to_f_detail):
                print(f"Entering the for loop for {counter+1} time") 
                counter +=1
                try:
                    find_details = future.result()
                except Exception as exc:
                    print(f"Finding {id} generated and exception: {exc}")
                else:
                    f_det = callback_processing(find_details)
                    output.append(f_det)
        return output
    

    If we have an array of 500 ids and we do 500 calls and all calls yield a future, we will print the message in the print 500 time, once each time before entering the try loop.

    We are not forced to use a Queue to avoid a race condition in this case. Futures creates a deferred execution when we use submit we get back a future to be used later

    Some important notes and recommendations:

    1. Ramalho, Luciano, Fluent Python , chapter 17th Concurrency with Future.
    2. Beazley, David: Python Cookbook Chapter 12 Concurrency. Page 516 Defining and Actor Task