Search code examples
pythonpython-3.xfunctional-programmingpython-multiprocessing

map() with partial arguments: save up space


I have a very large list of dictionaries, which keys are a triple of (string, float, string) and whose values are again lists. cols_to_aggr is basically a list(defaultdict(list))

I wish I could pass to my function _compute_aggregation not only the list index i but also exclusively the data contained by that index, namely cols_to_aggr[i] instead of the whole data structure cols_to_aggr and having to get the smaller chunk inside my parallelized functions. This because the problem is that this passing of the whole data structures cause my Pool to eat up all my memory with no efficiency at all.

with multiprocessing.Pool(processes=n_workers, maxtasksperchild=500) as pool:
    results = pool.map(
        partial(_compute_aggregation, cols_to_aggr=cols_to_aggr,
                aggregations=aggregations, pivot_ladetag_pos=pivot_ladetag_pos,
                to_ix=to_ix), cols_to_aggr)

def _compute_aggregation(index, cols_to_aggr, aggregations, pivot_ladetag_pos, to_ix):
    data_to_process = cols_to_aggr[index]

To give a patch to my memory issue I tried to set a maxtasksperchild but without success, I have no clue how to optimally set it.


Solution

  • Using dict.values(), you can iterate over the values of a dictionary.

    So you could change your code to:

    with multiprocessing.Pool(processes=n_workers, maxtasksperchild=500) as pool:
        results = pool.map(
            partial(_compute_aggregation,
                    aggregations=aggregations, pivot_ladetag_pos=pivot_ladetag_pos,
                    to_ix=to_ix), cols_to_aggr.values())
    
    def _compute_aggregation(value, aggregations, pivot_ladetag_pos, to_ix):
        data_to_process = value
    

    If you still need the keys in your _compute_aggregation function, use dict.items() instead.