Search code examples
python-3.xconcurrencyconcurrent.futures

concurrent.futures.ThreadPoolExecutor -- Checking on Status


I have a question that is nearly identical to this one:

Checking up on a `concurrent.futures.ThreadPoolExecutor`

There is a twist, however. Instead of using executor.submit I am using the map method (since I have a variable number of processes and there could be a lot). Here's relevant code:

    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
      list_of_chain_responses = list(executor.map(chain, question_list))
      print('pending:', executor._work_queue.qsize(), 'jobs')
      print('threads:', len(executor._threads))
      print()

I tried using the technique shown in response to the above-cited question. But it shows the jobs as '0' and only prints it once. I assume (perhaps incorrectly?) that is because it only prints the jobs after the entire question_list has been fed asynchronously to the chain function call?

Basically I am just trying to gain some insight as to how much (if at all) this is speeding up the process. That is the objective. I've tried running with and without threading to check, but there are other things going on with my network which make that a fairly inexact way to measure the advantage. I also tried just putting print("start") and print("end") in the chain function itself to see if that gives me any insight, but it appears that executor only "calls" the chain function once (because I only see the start and end once) but nevertheless processes it multiple time using the question list.


Solution

  • executor.map returns an iterator. Calling list(executor(map(...))) immediately requests the results of all the asynchronous functions being called, so the work is done before getting to the prints.

    Instead, iterate over the iterator and print the status after each one:

    import concurrent.futures
    from time import perf_counter as pf, sleep
    
    def chain(n):
        sleep(.1)  # serially, 30 calls should take 3 seconds
        return n * 2
    
    start = pf()
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
          iterator = executor.map(chain, range(30))
          for result in iterator:
              print(f'result:{result:2d} pending:{executor._work_queue.qsize():2d} threads:{len(executor._threads)}')
    print(f'parallel time: {pf() - start:.2f}s')
    

    Output:

    result: 0 pending:21 threads:8
    result: 2 pending:14 threads:8
    result: 4 pending:14 threads:8
    result: 6 pending:14 threads:8
    result: 8 pending:14 threads:8
    result:10 pending:14 threads:8
    result:12 pending:14 threads:8
    result:14 pending:14 threads:8
    result:16 pending:13 threads:8
    result:18 pending: 6 threads:8
    result:20 pending: 6 threads:8
    result:22 pending: 6 threads:8
    result:24 pending: 6 threads:8
    result:26 pending: 6 threads:8
    result:28 pending: 6 threads:8
    result:30 pending: 6 threads:8
    result:32 pending: 5 threads:8
    result:34 pending: 0 threads:8
    result:36 pending: 0 threads:8
    result:38 pending: 0 threads:8
    result:40 pending: 0 threads:8
    result:42 pending: 0 threads:8
    result:44 pending: 0 threads:8
    result:46 pending: 0 threads:8
    result:48 pending: 0 threads:8
    result:50 pending: 0 threads:8
    result:52 pending: 0 threads:8
    result:54 pending: 0 threads:8
    result:56 pending: 0 threads:8
    result:58 pending: 0 threads:8
    parallel time: 0.42s
    

    Make sure chain is doing I/O bound work. sleep simulates that, since it releases the CPU. With the Python global interpreter lock (GIL) only one thread can run Python code at a time, so CPU-bound work won't get a speed-up. ProcessPoolExecutor can speed up CPU-bound work.