Search code examples
pythondictionarynestedmultiprocessingstarmap

Python - multiprocessing a nested dictionary


I have tried using this question to answer my problem, but I haven't had any success.

I'm using Python 3.10.

My dictionary is structured like this (where each list of string is a review of the product):

{storeNameA : {productA : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string], 
               productB : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string],
               ...,
               product_n : 0 [string, string, ..., string]
                           1 [string, string, ..., string]
                           2 [string, string, ..., string]
                           ...
                           n [string, string, ..., string]},
 storeNameB : {productA : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string], 
               productB : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string],
               ...,
               product_n : 0 [string, string, ..., string]
                           1 [string, string, ..., string]
                           2 [string, string, ..., string]
                           ...
                           n [string, string, ..., string]}}

So I would access a single 'review' like dictionary['storeNameA']['productB'][0]
or dictionary['storeNameB']['productB'][2]. Each product is the same in each store.

I am trying to perform a process on each review across the entire dictionary. I can perform this sucessfuly in an iterative manner with this code:

def mapAllValues(nestedDict, func):
    return {storeName: {product: func(prodFile) for product, prodFile in storeDict.items()} for storeName, storeDict in nestedDict.items()}

new_dictionary = mapAllValues(dictionary, lambda reviews: reviews.apply(processFunction))
# processFunction takes a list of string and returns a list of tuples.
# So I end up with a new dictionary where there is now a list of tuples, where there was a list of string.
# {storeName : {product : 0 [(str, str), (str, str), ..., (str, str)]    and so on...

It's a pretty long dictionary, and takes ~606 seconds to complete.
So, I have tried to implement a way to run this in parallel, but it's obviously not working as I expect it to because that runs in ~2170 seconds. I do get the right output though.

My question is, what am I doing wrong in the following code please? Can anyone provide me a solution to this problem?

manager = multiprocessing.Manager()
container = manager.dict()
    d = manager.dict(dictionary)
    container = manager.dict()
    for key in d:
        container[key] = manager.dict()
    for key in d['storeNameA']:
        container['storeNameA'][key] = manager.dict()
    for key in d['storeNameB']:
        container['storeNameB'][key] = manager.dict()
    
    with multiprocessing.Pool() as pool:
        pool.starmap(processFunction, [('storeNameA', product, d, container) for product in d['storeNameA']], chunksize=round(42739 / multiprocessing.cpu_count()))
        pool.starmap(processFunction, [('storeNameB', product, d, container) for product in d['storeNameB']], chunksize=round(198560 / multiprocessing.cpu_count()))
    
new_dictionary = dict(container)

I'm sure I'm misunderstanding how this is actually working, but as I see it it should be chunking each product from each store and parellising those?

Anyway, I think I've explained it as well as I can. If I need to clarify anything, please let me know.
Thank you in advance!


Solution

  • First of all, while creating managers is relatively cheap, accessing them can become quite expensive if you don't know how they work. Long story short, they spawn a separate process, and allow other processes to execute commands on any object stored inside the process. These commands are read sequentially (execution can be somewhat parallel since they use threading internally).

    Therefore, if two or more processes attempt to access a managed object (a dictionary in this case) at the same time, one will block until the other process's request is read. Therefore, managers are non-ideal when using multiprocessing (although very useful nonetheless), and definitely something to be reconsidered when the parallel processes need to regularly access the managed object (which I assume is the case here with processFunction).

    With that said, here, you do not even need to use managers. From the looks of it, processFunction seems like a localized function which does not care about the state of the dictionary as a whole. Therefore, you should only concern yourself with concatenating the return values from the pool into your main dictionary from within the main process itself, rather then worrying about trying to create shared memory for the pool to have access to (remember that a pool automatically passes the return value of the tasks it is assigned to the main process upon completion).

    Here's a way you can do that, with a sample dictionary and processFunction, along with a benchmark comparing the speed if you were to do the same task serially.

    from multiprocessing import Pool
    import string, random, time
    
    def review_generator(size=10):
        chars = string.ascii_uppercase + string.digits
        return ''.join(random.choice(chars) for _ in range(size))
    
    def processFunc(product, prodFile):
        # Return a tuple of the product name and the altered value (a list of tuples)
        return product, [[(element, review_generator()) for element in review] for review in prodFile]
    
    
    if __name__ == "__main__":
    
        # Generate example dictionary
        dictionary = {'storeNameA': {}, 'storeNameB': {}}
        for key, _ in dictionary.items():
            for prod_i in range(1000):
                prod = f'product{prod_i}'
                dictionary[key][prod] = [[review_generator() for _ in range(50)] for _ in range(5)]
    
        # Time the parallel approach
        t = time.time()
        with Pool() as pool:
            a = pool.starmap(processFunc, [(product, prodFile) for product, prodFile in dictionary['storeNameA'].items()])
            b = pool.starmap(processFunc, [(product, prodFile) for product, prodFile in dictionary['storeNameB'].items()])
    
        print(f"Parallel Approach took {time.time() - t}")
    
        # Time the serial approach
        t = time.time()
    
        a = [processFunc(product, prodFile) for product, prodFile in dictionary['storeNameA'].items()]
        b = [processFunc(product, prodFile) for product, prodFile in dictionary['storeNameB'].items()]
    
        print(f"Serial approach took {time.time() - t}")
    

    Output

    Parallel Approach took 1.5318272113800049
    Serial approach took 5.765411615371704
    

    Once you have the results from the sample processFunction for each store inside a and b, you can then create your new dictionary in the main process itself:

    new_dictionary = {'storeNameA': {}, 'storeNameB': {}}
    for product, prodFile in a:
        new_dictionary['storeNameA'][product] = prodFile
    for product, prodFile in b:
        new_dictionary['storeNameB'][product] = prodFile
    

    I would also encourage you to try different variants of assigning tasks to workers a pool offers, (like imap) to see if they fit your use-case better and are more efficient.