Search code examples
pythonmultiprocessingcpython

Running multiple processes in each iteration of loop in Python


I have two functions func_1 and func_2. They both take an array of integers as input.

I have a loop that creates arrays of length i in the ith iteration.

I have two predefined lists list_1 and list_2 intended to store the outputs from the functions at each iteration.

Thus, for each iteration I want to run the two functions parallelly with the array created in the iteration as the input. CPython doesn't have true multithreading, so I'm using multiprocessing instead.

I want to create two separate processes for each of the functions at each iteration of the loop, and close the processes by the end of the iteration.

I have so far tried using Pool and Process. I can't get them to populate list_1 and list_2, even by sending the two lists as arguments to the respective functions and appending the result to them, in the functions.

How do I achieve this? Explanations will be appreciated.

Here is what I've done that's not working:

import random
import multiprocessing as mp

list_1 = []
list_2 = []

for input in range(n):
    arr_1 = [random.randint(0,100) for _ in range(input)]
    arr_2 = list(arr_1)
    proc_one = mp.Process(target=func_1, args=(arr_1, list_1)
    proc_two = mp.Process(target=func_2, args=(arr_2, list_2)
    
    proc_one.start()
    proc_two.start()
    
    proc_one.join()
    proc_two.join()
    
print(list_1)
print(list_2)

And here are the two worker functions:

def func_1(arr, outlist):
    # do something with arr
    # store the value in result
    outlist.append(result)
    
def func_2(arr, outlist):
    # do something differnt with arr
    # store the value in result
    outlist.append(result)

P.S. This is a simpliified version of my actual code. I must take the entire array as input to the worker functions at each iteration.


Solution

  • Processes run in different address spaces. So when func_1 or func_2 update lists list_1 and list_2 they are modifying copies of the lists that exist in their own respective address spaces; they are not updating the copies that exist in the main process.

    There are several ways of handling this. One way is to create the lists in shared memory so that each child process is working on the same list as the one created by the main process. Another way is to have the child processes write their results to a multiprocessing.Queue instance. The third way, which is the simplest and what we will use, is to use a multiprocessing pool that internally uses a queue to return results.

    You state that you want to "close by the end of each iteration." If by "close" you mean "terminate" then the code below will create and destroy the multiprocessing pool for each iteration. But this is very wasteful and I am not sure why you would want to do that.

    Note: You should not use names such as input as variable name since input is the name of a built-in Python function. Also, in, which in your posted code you used as an argument name, is a reserved word in Python and cannot be used as a variable name.

    import random
    import multiprocessing as mp
    import time
    
    def func_1(arr):
        t = time.process_time()
        sorted_arr = sorted(arr)
        return time.process_time() - t, sorted_arr
    
    def func_2(arr):
        t = time.process_time()
        sorted_arr = sorted(arr, reverse=True)
        return time.process_time() - t, sorted_arr
    
    def main():
        """
        # Use this order of the "for" and "with" blocks to
        # create the pool only once to avoid needless destruction
        # and re-creation of the pool:
        with mp.Pool(2) as pool:
            for i in range(1, 6): # start from 1
        """
        for i in range(100_000, 1_100_000, 100_000): # start from 100_000
            with mp.Pool(2) as pool:
                arr = [random.randint(0, 1_000_000) for _ in range(i)]
    
                # Run these two operations in parallel:
                async_result1 = pool.apply_async(func_1, args=(arr,))
                async_result2 = pool.apply_async(func_2, args=(arr,))
    
                # Get the results and print:
                t1, list_1 = async_result1.get()
                t2, list_2 = async_result2.get()
                print(f'size = {i}, arr = {arr[0:3]} ... {arr[-3:]}')
                print(f'list_1 = {list_1[0:3]} ... {list_1[-3:]}, process time = {t1}')
                print(f'list_2 = {list_2[0:3]} ... {list_2[-3:]}, process time = {t2}')
                print()
            # The pool is terminated on exit from prior "with" block
    
    if __name__ == '__main__':
        main()
    

    Prints:

    size = 100000, arr = [963985, 885752, 398168] ... [346806, 931449, 731479]
    list_1 = [3, 25, 39] ... [999975, 999981, 999992], process time = 0.015625
    list_2 = [999992, 999981, 999975] ... [39, 25, 3], process time = 0.015625
    
    size = 200000, arr = [998892, 442816, 338664] ... [914664, 469598, 779263]
    list_1 = [2, 16, 17] ... [999978, 999982, 999984], process time = 0.03125
    list_2 = [999984, 999982, 999978] ... [17, 16, 2], process time = 0.046875
    
    size = 300000, arr = [329332, 623887, 967070] ... [848851, 937626, 842896]
    list_1 = [3, 5, 7] ... [999994, 999994, 999995], process time = 0.078125
    list_2 = [999995, 999994, 999994] ... [7, 5, 3], process time = 0.078125
    
    size = 400000, arr = [94596, 348994, 79236] ... [135108, 522948, 124498]
    list_1 = [0, 2, 8] ... [999997, 999998, 999999], process time = 0.09375
    list_2 = [999999, 999998, 999997] ... [8, 2, 0], process time = 0.09375
    
    size = 500000, arr = [597997, 143252, 612416] ... [271672, 825160, 351058]
    list_1 = [3, 3, 5] ... [999992, 999997, 999997], process time = 0.140625
    list_2 = [999997, 999997, 999992] ... [5, 3, 3], process time = 0.125
    
    size = 600000, arr = [559728, 577812, 640277] ... [143838, 231247, 242004]
    list_1 = [1, 3, 5] ... [999993, 999994, 1000000], process time = 0.171875
    list_2 = [1000000, 999994, 999993] ... [5, 3, 1], process time = 0.171875
    
    size = 700000, arr = [237665, 926271, 747910] ... [206456, 577997, 613106]
    list_1 = [0, 2, 2] ... [999999, 1000000, 1000000], process time = 0.1875
    list_2 = [1000000, 1000000, 999999] ... [2, 2, 0], process time = 0.21875
    
    size = 800000, arr = [164147, 141817, 369944] ... [723224, 958884, 223842]
    list_1 = [0, 1, 3] ... [999995, 999999, 999999], process time = 0.265625
    list_2 = [999999, 999999, 999995] ... [3, 1, 0], process time = 0.28125
    
    size = 900000, arr = [168167, 942110, 354510] ... [261892, 747797, 579109]
    list_1 = [0, 1, 1] ... [999999, 1000000, 1000000], process time = 0.28125
    list_2 = [1000000, 1000000, 999999] ... [1, 1, 0], process time = 0.28125
    
    size = 1000000, arr = [720988, 161719, 803588] ... [710593, 309087, 699567]
    list_1 = [0, 0, 3] ... [1000000, 1000000, 1000000], process time = 0.328125
    list_2 = [1000000, 1000000, 1000000] ... [3, 0, 0], process time = 0.3437