Search code examples
pythondictionarymemorymultiprocessingpython-multiprocessing

How to share dictionary memory in different process?


Is it possible to share a dictionary variable in Python Multiprocess (pathos.multiprocess)?

I use the following code, however it doesn't work as expected.

I hope the skus be {0: 0, 1: 1, ...}

from pathos.multiprocessing import Pool as ProcessPool

def outer():
    skus = {}

    def process(skus, sku):
        skus[sku] = sku * 10

    with ProcessPool() as pool:
        pool.starmap(process, ((skus, sku) for sku in range(100)), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

Output:

skus = {}

So, I used Manager().dict as my variable, but now I get the another error.

Where is the problem and how can I correctly share dict in multiprocess?

from pathos.multiprocessing import Pool as ProcessPool
from multiprocessing import Manager

def outer():
    manager = Manager()
    skus = manager.dict()

    def process(sku):
        skus[sku] = sku * 10

    with ProcessPool() as pool:
        pool.map(process, range(100), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

Output: (Error)

....
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected

Solution

  • Using a managed dictionary is certainly a method that enables the sharing of a dictionary across multiple processes. But there is considerable overhead in using such a dictionary arising from the fact that every method call on the dictionary a process makes is actually invoking a method of a proxy object. The actual dictionary itself resides in a special process that is created when multiprocessing.Manager() is invoked. Thus, the proxy object must marshal whatever arguments are passed to it along with the method name being invoked by serializing these values and sending them via a socket or named pipe to the manager's process where the request will de-serialized and applied against the actual dictionary.

    So in these situations you should ask the question whether it is really necessary for there to be a sharable dictionary at all. Since your worker function process is only updating a single key value of the dictionary and does not need to know what other keys and values in the dictionary are, it could simply do its calculation and return back to the main process the key and value that needs to be set and the main process can do the updating/creation of the dictionary:

    from multiprocessing import Pool as ProcessPool
    
    def process(sku):
        return sku, sku * 10
    
    def outer():
        with ProcessPool() as pool:
            skus = {sku: result for sku, result in pool.map(process, range(100), chunksize=3)}
    
        print(skus)
    
    if __name__ == "__main__":
        outer()
    

    Prints:

    {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90, 10: 100, 11: 110, 12: 120, 13: 130, 14: 140, 15: 150, 16: 160, 17: 170, 18: 180, 19: 190, 20: 200, 21: 210, 22: 220, 23: 230, 24: 240, 25: 250, 26: 260, 27: 270, 28: 280, 29: 290, 30: 300, 31: 310, 32: 320, 33: 330, 34: 340, 35: 350, 36: 360, 37: 370, 38: 380, 39: 390, 40: 400, 41: 410, 42: 420, 43: 430, 44: 440, 45: 450, 46: 460, 47: 470, 48: 480, 49: 490, 50: 500, 51: 510, 52: 520, 53: 530, 54: 540, 55: 550, 56: 560, 57: 570, 58: 580, 59: 590, 60: 600, 61: 610, 62: 620, 63: 630, 64: 640, 65: 650, 66: 660, 67: 670, 68: 680, 69: 690, 70: 700, 71: 710, 72: 720, 73: 730, 74: 740, 75: 750, 76: 760, 77: 770, 78: 780, 79: 790, 80: 800, 81: 810, 82: 820, 83: 830, 84: 840, 85: 850, 86: 860, 87: 870, 88: 880, 89: 890, 90: 900, 91: 910, 92: 920, 93: 930, 94: 940, 95: 950, 96: 960, 97: 970, 98: 980, 99: 990}
    

    Benchamark

    We will take the above approach but instead process 1_000_000 key/value pairs and allow the pool to pick its own optimum chunksize:

    from multiprocessing import Pool as ProcessPool
    from timing import time_it
    
    def process(sku):
        return sku, sku * 10
    
    @time_it
    def outer():
        with ProcessPool() as pool:
            skus = {sku: result for sku, result in pool.map(process, range(1_000_000))}
    
        #print(skus)
    
    if __name__ == "__main__":
        outer()
    

    Prints:

    func: outer args: [(), {}] took: 0.7160062 sec.
    

    Here is a version where the process worker function does not need to send back the key:

    from multiprocessing import Pool as ProcessPool, cpu_count
    from timing import time_it
    
    def process(sku):
        return sku * 10
    
    
    @time_it
    def outer():
        with ProcessPool() as pool:
            skus = {i: result for i, result in enumerate(pool.map(process, range(1_000_000)))}
    
        #print(skus)
    
    if __name__ == "__main__":
        outer()
    

    Prints:

    func: outer args: [(), {}] took: 0.5001733 sec.
    

    And here we use a managed dictionary as a global variable:

    from multiprocessing import Pool as ProcessPool, Manager
    from timing import time_it
    
    def init_pool_processes(d):
        global skus
        skus = d
    
    def process(sku):
        skus[sku] = sku * 10
    
    @time_it
    def outer():
        skus = Manager().dict()
        with ProcessPool(initializer=init_pool_processes, initargs=(skus,)) as pool:
            pool.map(process, range(1_000_000))
    
        #print(skus)
    
    if __name__ == "__main__":
        outer()
    

    Prints:

    func: outer args: [(), {}] took: 60.8297226 sec.
    

    And instead of passing skus as a global variable, we will explicitly pass it as an additional argument to process:

    from multiprocessing import Pool as ProcessPool, Manager
    from functools import partial
    from timing import time_it
    
    def process(skus, sku):
        skus[sku] = sku * 10
    
    @time_it
    def outer():
        skus = Manager().dict()
        with ProcessPool() as pool:
            pool.map(partial(process, skus), range(1_000_000))
    
        #print(skus)
    
    if __name__ == "__main__":
        outer()
    

    Prints:

    func: outer args: [(), {}] took: 60.7377072 sec.
    

    Results:

    Generally accessing a global variable is slower than accessing a local variable. But since process is only accessing skus once and that access represents such a small percentage of the total CPU processing required when using a managed dictionary, it really doesn't matter too much which method of passing the managed dictionary is used: the running time will be approximately 61 seconds vs. .5 seconds when not using a managed dictionary at all.