Search code examples
pythondictionaryparallel-processingpython-multiprocessingjoblib

Parallelize Python Dictionary Comprehension


I'm trying to parallelize the subsetting of a Python dictionary. The code below creates a new dictionary, positions_sub, based on if the keys in positions dictionary are found in a list, node_list:

positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

This code works just fine and does exactly what I want. However, it takes a while to run so I'm trying to parallelize it. I was trying to do this in the code below, but it returns positions_sub as a list of dictionaries, which isn't what I want. There are also some issues with the number of values per key. Any ideas how to get this working? Thanks!

from joblib import Parallel, delayed

def dict_filter(k,v):
    if k in node_list:
        positions_sub[k] = v
    return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())

Solution

  • Before you resort to parallelization you should make sure you are using the right data structure for each task: Remember that x in list is essentially O(n) whereas x in set (and also x in dict) is more like O(1). Therefore just converting your node_list to a set can improve the performance tremendously.

    node_list = set(node_list)
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    

    An other thing to consider is the ratio between len(positions) and len(node_list). If one is substantially smaller than the other you should always iterate over the smaller one.


    EDIT: some code for performance comparisons

    import random
    import timeit
    import functools
    
    def generate(n_positions=1000, n_node_list=100):
        positions = { i:i for i in random.sample(range(n_positions), n_positions) }
        node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
        return positions, node_list  
    
    def validate(variant):
        data = generate(1000, 100)
        if sorted(data[1]) != sorted(k for k in variant(*data)):
            raise Exception(f"{variant.__name__} failed")
    
    def measure(variant, data, repeats=1000):
        total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
        average_ms = total_seconds / repeats * 1000
        print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )   
    
    
    def variant1(positions, node_list):
        positions_sub = {}
        for k,v in positions.items():
            if k in node_list:
                positions_sub[k] = v
        return positions_sub
    
    def variant1b(positions, node_list):
        node_list = set(node_list)
        positions_sub = {}
        for k,v in positions.items():
            if k in node_list:
                positions_sub[k] = v
        return positions_sub
    
    def variant2(positions, node_list):
        return {k:v for k,v in positions.items() if k in node_list}
    
    def variant2b(positions, node_list):
        node_list = set(node_list)
        return {k:v for k,v in positions.items() if k in node_list}
    
    def variant3(positions, node_list):
        return {k:positions[k] for k in node_list if k in positions}
    
    
    
    if __name__ == "__main__":
        variants = [variant1,variant1b,variant2,variant2b,variant3]
        for variant in variants:
            validate(variant)      
    
        n_positions = 4000
        n_node_list = 1000
        n_repeats = 100
        data = generate(n_node_list, n_node_list)
        print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
        for variant in variants:
            measure(variant, data, n_repeats)
    

    EDIT2: as requested, here some results on my machine

    first run:
    data generated with len(positions)=4000 and len(node_list)=1000
    variant1   took an average of 6.90ms per pass over 100 passes
    variant1b  took an average of 0.22ms per pass over 100 passes
    variant2   took an average of 6.95ms per pass over 100 passes
    variant2b  took an average of 0.12ms per pass over 100 passes
    variant3   took an average of 0.19ms per pass over 100 passes
    
    second run:
    data generated with len(positions)=40000 and len(node_list)=10000
    variant1   took an average of 738.23ms per pass over 10 passes
    variant1b  took an average of   2.04ms per pass over 10 passes
    variant2   took an average of 739.51ms per pass over 10 passes
    variant2b  took an average of   1.52ms per pass over 10 passes
    variant3   took an average of   1.85ms per pass over 10 passes
    

    Note that n=len(positions) and m=len(node_list) have been selected such that the ratio n/m=4 is roughly equivalent to that of the original data which has been specified by OP as 1.2M for n and 300K for m.

    Observe the effect of scaling up by a factor of 10 from the first to the second run: Where in the first run variant1b is about 31 times faster than variant1, in the second run it is 361 times faster! This is the expected result of reducing the complexity of the k in node_list from O(m) to O(1). The total time complexity of variant1 is n*m = 0.25*n^2 = O(n^2) whereas variant1b has only n*1 = O(n). This means that for every order of magnitude that n increases, variant1b is also an order of magnitude faster than variant1.

    That a similar performance improvement can be achieved by parallelization alone is rather doubtful, as by and large the expected performance gain of an embarrassingly parallelizable problem is a multiple of the available CPUs, which is still a constant factor and nowhere near the gain of improving the algorithm from O(n^2) to O(n).

    Also, while IMHO the given problem falls into the class of embarrassingly parallelizable problems, the output must be aggregated after the parallel processing before it can be used. Furthermore I'm quite unfamiliar with joblib which is why I have skipped adding it to the comparison.