Search code examples
pythonpython-3.xpython-multiprocessingshared-memoryragged

multiprocessing.Pool sharing large lists of lists read-only in memory across child process


I'm strungling with this problem.

I have a big list of lists that I want to acess with parallel code to perform CPU intensive operations. In order to do that i'm trying to use multiprocessing.Pool, the problem is that I also need to see this massive list of lists across my child process.

As the 'list of lists' is not regular (ex: [[1, 2], [1, 2, 3]]) I can't store them as a mp.Array, and as previouslly said, I'm not using mp.Process so I didin't figure out a way of using mp.Manager on this task. It's important to me to keep this list of lists because i'm applyng a function that querys based on indexes using from operator import itemgetter.

Here is a fictitious example of what i'm trying to achive:

import multiprocessing as mp
from operator import itemgetter
import numpy as np

def foo(indexes):
    # here I must guarantee read acess for big_list_of_lists on every child process somehow
    # as this code would work with only with one child process using global variables but would fail
    # with larger data.
    store_tuples = itemgetter(*indexes)(big_list_of_lists)
    return np.mean([item for sublista in store_tuples for item in sublista])

def main():
    # big_list_of_lists is the varible that I want to share across my child process
    big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]

    ctx = mp.get_context('spawn')
    # big_list_of_lists elements are also passed as args
    pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
    res=list(pool.map(foo, big_list_of_lists))
    pool.close()
    pool.join()

    return res

if __name__ is '__main__':
    print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
#     store_tuples = itemgetter(*i)(big_list_of_lists)
#     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]

other details: solution preferably should be achived using python 3.6 and must work on windows

Thank you very much!


Solution

  • It seems to work fine for me using mp.Manager, with an mp.Manager.list of mp.Manager.lists. I believe this will not copy the lists to every process.

    The important line is:

    big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
    

    You may want to use instead, depending on your use case:

    big_list_of_lists_proxy = manager.list(big_list_of_lists)
    

    Whether every sublist should be a proxy or not depends on whether each sublist is large, and also whether it is read in its entirety. If a sublist is large, then it is expensive to transfer the list object to each process that needs it (O(n) complexity) and so a proxy list from a manager should be used, however if every element is going to be needed anyway, there is no advantage to using a proxy.

    import multiprocessing as mp
    from operator import itemgetter
    import numpy as np
    from functools import partial
    
    
    def foo(indexes, big_list_of_lists):
        # here I must guarantee read acess for big_list_of_lists on every child process somehow
        # as this code would work with only with one child process using global variables but would fail
        # with larger data.
        store_tuples = itemgetter(*indexes)(big_list_of_lists)
        return np.mean([item for sublista in store_tuples for item in sublista])
    
    
    def main():
        # big_list_of_lists is the varible that I want to share across my child process
        big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
        ctx = mp.get_context('spawn')
        with ctx.Manager() as manager:
            big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
            # big_list_of_lists elements are also passed as args
            pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
            res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
            pool.close()
            pool.join()
    
        return res
    
    
    if __name__ == '__main__':
        print(main())
    # desired output is equivalente to:
    # a = []
    # for i in big_list_of_lists:
    #     store_tuples = itemgetter(*i)(big_list_of_lists)
    #     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
    # 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]