Search code examples
pythonmultiprocessingpython-multiprocessing

Python Multiprocessing - How to use Pool to run on two functions


I am just figuring out how to use Python multiprocessing and have the following code:

import multiprocessing as mp

res1 = set()
res2 = set()
nums = [1, 2, 3, 4, 5]

def squared(x):
    res1.add(x ** x)

def cubed(x):
    res2.add(x * x * x)

def main():
    pool = mp.Pool(mp.cpu_count())

    pool.map(squared, nums)
    print(res1, res2)

    pool.map(cubed, nums)
    print(res1, res2)

if __name__ == "__main__":
    main()

Result:

set() set()
set() set()

At the end I just want to update res1 and res2 where res1is updated first using squared in parallel for each number in nums and wait for that to finish, then update res2 using cubed in parallel for each number in nums.

Does anyone know how to make this work?


Solution

  • I am assuming that you are aware that due to the trivialness of your worker functions squared and cubed that using multiprocessing will result in a longer execution time. I will also assume that your actual worker functions are CPU-intensive enough such that what you gain by running these functions in parallel more than offset the addition overhead incurred by multiprocessing.

    That said, there are multiple approaches you can take:

    Have the Main Process Perform the Set Processing

    This was suggested by Michael Butscher:

    import multiprocessing as mp
    
    def squared(x):
        return x * x
    
    def cubed(x):
        return x * x * x
    
    def main():
        nums = [1, 2, 3, 4, 5]
    
        pool = mp.Pool(mp.cpu_count())
    
        res1 = set(pool.map(squared, nums))
        res2 = set(pool.map(cubed, nums))
        print(res1)
        print(res2)
    
    if __name__ == "__main__":
        main()
    

    Prints:

    {1, 4, 9, 16, 25}
    {64, 1, 8, 27, 125}
    

    Use a Managed Dictionary As a Set Replacement

    If all you are doing is adding elements to a set (to prevent duplicates), you can achieve the same result with a managed dictionary since its keys must be unique:

    import multiprocessing as mp
    from functools import partial
    
    def squared(res1, x):
        res1[x * x] = None # The actual value does not matter
    
    def cubed(res2, x):
        res2[x * x * x] = None
    
    def main():
        nums = [1, 2, 3, 4, 5]
    
        pool = mp.Pool(mp.cpu_count())
        with mp.Manager() as manager:
            res1 = manager.dict()
            res2 = manager.dict()
    
            pool.map(partial(squared, res1), nums)
            pool.map(partial(cubed, res2), nums)
    
            print(list(res1.keys()))
            print(list(res2.keys()))
    
            # Or create actual sets:
            s1 = set(res1)
            s2 = set(res2)
            print(s1)
            print(s2)
    
    if __name__ == "__main__":
        main()
    

    Prints:

    [4, 1, 16, 9, 25]
    [64, 125, 1, 27, 8]
    {1, 4, 9, 16, 25}
    {64, 1, 8, 27, 125}
    

    Create Your Own Managed Set

    import multiprocessing as mp
    from multiprocessing.managers import BaseManager
    from functools import partial
    
    class SetManager(BaseManager):
        pass
    
    def squared(res1, x):
        res1.add(x * x)
    
    def cubed(res2, x):
        res2.add(x * x * x)
    
    def main():
        nums = [1, 2, 3, 4, 5]
    
        SetManager.register('Set', set)
    
        pool = mp.Pool(mp.cpu_count())
        with SetManager() as manager:
            res1 = manager.Set()
            res2 = manager.Set()
    
            pool.map(partial(squared, res1), nums)
            pool.map(partial(cubed, res2), nums)
    
            print(res1)
            print(res2)
    
    if __name__ == "__main__":
        main()
    

    Prints:

    {1, 4, 9, 16, 25}
    {64, 1, 8, 27, 125}
    

    Note that the proxy that is automatically generated above for the managed set only supports basic methods such as add, clear, remove, etc. You will not be able to do operations such as res1 += res2.