Search code examples
pythonmultiprocessingclosurespython-multiprocessing

Python-MultiProces in closure function


The case1 in test.py (as following code):

I can get a expected result for variable "res".

(res[-1] is correct that i want)

test.py:

from multiprocessing import Pool as ProcessPool

skus = [i for i in range(4)]

def call_func(i):
    skus[i] = i * 10
    return skus

def process():
    with ProcessPool() as pool:
        res = pool.map(call_func, skus, chunksize=2)

        print(f"process result={res}")

if __name__ == "__main__":
    [func() for func in (process, )]

Output:

process result=[[0, 10, 2, 3], [0, 10, 2, 3], [0, 10, 20, 30], [0, 10, 20, 30]]

However, I try to use two file(.py) in case2

  1. main.py
  2. process.py

I have run main.py, and it call function from process.py.

In the process.py, I use MultiProcess pool to execute. (use the "pathos.multiprocessing" in python, which can avoid "cannot pickle problem")

But, when the variable "sku_res" in the outer() function, the variable cannot share like case1

I expected my process result like case1

where is the problem in the case2 ?

How can I modify ?

main.py:

type here
from process import outer

def main():
    outer()

if __name__ == "__main__":
    main()

process.py:

from pathos.multiprocessing import Pool as ProcessPool
from decorate import timeit

@timeit
def outer():
    skus = [i for i in range(4)]

    def call_func(i):
        sku[i] = i * 10
        return skus

    @timeit
    def process():
        with ProcessPool() as pool:
            res = pool.map(call_func, skus, chunksize=2)

        print("process result=", res)
    
    return process()

Output:

process result=[[0, 10, 2, 3], [0, 10, 2, 3], [0, 1, 20, 30], [0, 1, 20, 30]]

Solution

  • Your question is a bit confusing but I believe is based on a misunderstanding on your part. You say for case 1, 'I can get a expected result for variable "res"`. Actually, you have no right to expect any one specific result because it is indeterministic what will actually be returned. Let me explain:

    Each child process in the pool has its own version of list skus; this list is not shared across process spaces. You are submitting 4 tasks to the pool where i will take on values of 0, 1, 2 and 3. Because you have specified a *chunksize * value of 2, tasks are actually batched together so that when a pool process is idle it will retrieve two submitted tasks with i equal to 0 and 1. So a single pool process will modify its copy of sku to have new values [0, 10, 2, 3] having multiplied the first element of sku, i.e. 0 by 10 to yield 0 and then multiplying the second element, i.e. 1, by 10 to yield 10. If it performs these two tasks very quickly it becomes idle very quickly and can pull of of task queue the next chunk of two tasks. But depending on the timing, it is possible for another process in the pool to pull of this second chunk of two tasks and then it will be returning [0, 1, 20, 30].

    First, I will just modify function call_func to print its current process id and the current contents of skus before modification and then to print the result it is returning after modification:

    from multiprocessing import Pool as ProcessPool
    import os
    
    skus = [i for i in range(4)]
    
    def call_func(i):
        print(os.getpid(), 'found:', skus, flush=True)
        skus[i] = i * 10
        print('returning:', skus)
        return skus
    
    def process():
        with ProcessPool() as pool:
            res = pool.map(call_func, skus, chunksize=2)
    
            print(f"process result={res}")
    
    if __name__ == "__main__":
        [func() for func in (process, )]
    

    Prints:

    16912 found: [0, 1, 2, 3]
    returning: [0, 1, 2, 3]
    16912 found: [0, 1, 2, 3]
    returning: [0, 10, 2, 3]
    16912 found: [0, 10, 2, 3]
    returning: [0, 10, 20, 3]
    16912 found: [0, 10, 20, 3]
    returning: [0, 10, 20, 30]
    process result=[[0, 10, 2, 3], [0, 10, 2, 3], [0, 10, 20, 30], [0, 10, 20, 30]]
    

    In this run, even though I have by default a pool size of 8, only a single pool process pulled of both chunks from the input queue. Therefore, it is modifying repeatedly the same skus instant. But note that what it is saying it is returning is not what actually was returned. Not only are tasks chunked but also the return values. So the modified skus list is actually only returned after a complete chunk has been processed and that is why you see:

    [[0, 10, 2, 3], [0, 10, 2, 3], [0, 10, 20, 30], [0, 10, 20, 30]]

    instead of the expected:

    [[0, 1, 2, 3], [0, 10, 2, 3], [0, 10, 20, 3], [0, 10, 20, 30]]

    Now let me put in a call to time.sleep in call_func that will most likely result in two separate pool processes each working on one chunk of two tasks apiece:

    from multiprocessing import Pool as ProcessPool
    import os
    import time
    
    skus = [i for i in range(4)]
    
    def call_func(i):
        time.sleep(.1)
        print(os.getpid(), 'found:', skus, flush=True)
        skus[i] = i * 10
        print('returning:', skus)
        return skus
    
    def process():
        with ProcessPool() as pool:
            res = pool.map(call_func, skus, chunksize=2)
    
            print(f"process result={res}")
    
    if __name__ == "__main__":
        [func() for func in (process, )]
    

    Prints:

    19140 found: [0, 1, 2, 3]
    8728 found: [0, 1, 2, 3]
    returning: [0, 1, 2, 3]
    returning: [0, 1, 20, 3]
    8728 found: [0, 1, 20, 3]
    19140 found: [0, 1, 2, 3]
    returning: [0, 1, 20, 30]
    returning: [0, 10, 2, 3]
    process result=[[0, 10, 2, 3], [0, 10, 2, 3], [0, 1, 20, 30], [0, 1, 20, 30]]
    

    As you can see, each process is working on its own copy of skus and is not affected by any modifications made by the other process.

    If we change the chunksize value to 4 guaranteeing that one process will process all submitted tasks, then the output is:

    12728 found: [0, 1, 2, 3]
    returning: [0, 1, 2, 3]
    12728 found: [0, 1, 2, 3]
    returning: [0, 10, 2, 3]
    12728 found: [0, 10, 2, 3]
    returning: [0, 10, 20, 3]
    12728 found: [0, 10, 20, 3]
    returning: [0, 10, 20, 30]
    process result=[[0, 10, 20, 30], [0, 10, 20, 30], [0, 10, 20, 30], [0, 10, 20, 30]]
    

    Note that all 4 returned lists are identical!!!

    And if we change the chunksize to 1 and have a time delay so that 4 separate processes each process one task, we get for the output:

    21412 found: [0, 1, 2, 3]
    22180 found: [0, 1, 2, 3]
    3296 found: [0, 1, 2, 3]
    4824 found: [0, 1, 2, 3]
    returning: [0, 10, 2, 3]
    returning: [0, 1, 2, 3]
    returning: [0, 1, 20, 3]
    returning: [0, 1, 2, 30]
    process result=[[0, 1, 2, 3], [0, 10, 2, 3], [0, 1, 20, 3], [0, 1, 2, 30]]
    

    To summarize: As for the results you see, that is only because a single process was able to pull off both submitted chunks before a second process in the pool was able to grab a chunk to work on. This is a non-deterministic race condition that could have had different results.