Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

Pool.apply_async() creates and runs workers sequentially


I copied an expample code from a python multiprocessing tutorial and slightly modified it:

import multiprocessing as mp
import os
import time


start = time.time()

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""

    print(os.getpid(),' going to sleep')
    time.sleep(3)
    print(os.getpid(),' waking up')
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)

data = []
for i in range(8):
 arr_row = []
 for j in range(5):
  arr_row.append(random.randint(0,10))
 data.append(arr_row)

k = 6
print('CPU-count: ',mp.cpu_count())
print('using this many CPUs: ',k)
pool = mp.Pool(k)
results = []

i=0
for row in data:

    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8),callback=collect_result)
    i+=1
# Step 4: Close Pool and let all the processes complete
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print('final result: ',results_final[:10])
now = time.time()
print('script total time taken: ',now-start)

And I get this unexpected output:

CPU-count:  24
using this many CPUs:  6
143259  going to sleep
143259  waking up
143258  going to sleep
143258  waking up
143260  going to sleep
143260  waking up
143256  going to sleep
143256  waking up
143255  going to sleep
143255  waking up
143255  going to sleep
143255  waking up
143257  going to sleep
143257  waking up
143257  going to sleep
143257  waking up
final result:  [2, 1, 1, 1, 1, 1, 1, 4]
script total time taken:  6.07246470451355

It appears that for some reasons the program wait for howmany_within_range2() to complete before launching the next process, I do not understand why it is case, from what I can read in the documentation and on the web, the workers processes should be running asynchronously in parallel and not display this sequential behavior.

Edit:

The illusion was indeed OS based as explained by Roland. The workers do run asynchronously. this is my output adding the time when processes go to sleep and wake up :

CPU-count:  24
using this many CPUs:  6
1579047423.7 96321  going to sleep
1579047426.7 96321  waking up
1579047423.7 96319  going to sleep
1579047426.7 96319  waking up
1579047423.7 96320  going to sleep
1579047426.7 96320  waking up
1579047423.7 96322  going to sleep
1579047426.7 96322  waking up
1579047423.7 96317  going to sleep
1579047426.7 96317  waking up
1579047426.7 96317  going to sleep
1579047429.7 96317  waking up
1579047423.7 96318  going to sleep
1579047426.7 96318  waking up
1579047426.7 96318  going to sleep
1579047429.7 96318  waking up
final result:  [4, 1, 3, 3, 2, 1, 2, 2]
script total time taken:  6.050582647323608

Solution

  • Every child process sleeps for three seconds, and the total elapsed time in the parent process is about 6 seconds. That is evidence that in fact the processes do not run sequentially. If they were, the total time would have been 8 × 3 = 24 s.

    Your question carries the implicit assumption that the sequence in which you see the lines appear on the terminal indicates in which order they were sent, even if they come from different processes. And on a machine with a single-core processor that might even be true. But on a modern multi-core machine I don't think there is a guarantee of that.

    Standard output is generally buffered; sys.stdout is an io.TextIOWrapper instance:

    In [1]: import sys                                                                                       
    
    In [2]: type(sys.stdout)                                                                                 
    Out[2]: _io.TextIOWrapper
    

    So if you use print, the output will not show up immediately unless you use flush=True in your print calls, or if you reconfigure the sys.stdout object before starting a child process. And even then there may be buffering on the OS level.

    AFAIK, the operating system file object will produce the output in the order it received it, but since the input is received from different processes that all have inherited the same file descriptor for sys.stdout I don't think that there is a guarantee that this is the same order they are sent! Since we are talking about different processes here, the operating system scheduler also comes into play.

    To circumvent all that, you could include the time when print was called:

    def howmany_within_range2(i, row, minimum, maximum):
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    
        print(str(round(time.time(), 2)), os.getpid(), ' going to sleep')
        time.sleep(3)
        print(str(round(time.time(), 2)), os.getpid(), ' waking up')
        count = 0
        for n in row:
            if minimum <= n <= maximum:
                count = count + 1
        return (i, count)
    

    In my case, this gives:

    > python3 cputest.py
    CPU-count:  4
    using this many CPUs:  6
    1578218933.69 89875  going to sleep
    1578218933.69 90129  going to sleep
    1578218933.69 90509  going to sleep
    1578218933.69 90760  going to sleep
    1578218933.69 91187  going to sleep
    1578218933.69 91209  going to sleep
    1578218936.71 90509  waking up
    1578218936.71 89875  waking up
    1578218936.71 90760  waking up
    1578218936.71 91209  waking up
    1578218936.71 90129  waking up
    1578218936.71 91187  waking up
    1578218936.71 89875  going to sleep
    1578218936.71 90509  going to sleep
    1578218939.75 90509  waking up
    1578218939.75 89875  waking up
    final result:  [1, 4, 4, 2, 1, 2, 4, 2]
    script total time taken:  6.099705934524536
    

    This makes it clear that at least on FreeBSD all six worker functions are started within 1/100 of a second, and finish within 1/100 of a second.