Search code examples
pythonmultiprocessingpool

Cannot join output of all Pool processes


I cannot collect output of all Pool processes. Resulted dictionary probably contains output of 1 of 3 parallel processes (288 of 411 elements). Here is my code.

from itertools import repeat
from multiprocessing import Pool, Manager

def reader(entry, outcomes): #other args are constant
   .............................................
   #Some condition:
   prediction = min(distances) + (mlc_data[min(distances)[1]],)
   outcomes[entry_pairs[' '.join(entry)]] = prediction
   return outcomes

manager = Manager()
outcomes = manager.dict()
with Pool(3) as p:
  output = p.starmap(reader, zip(input[0], repeat(outcomes)))
  p.close()
  p.join()

Finally, I have only 288 of 411 elements in outcomes dictionary. From the other side, i have 411**2 elements in output, that probably means, that my code is not well optimized.


Solution

  • You are returning the entire, ever-growing outcomes dictionary with each call to reader. That is serialized, sent to the parent process, deserialized and then added to output. That is a huge cost, especially considering that the manager is also moving the data between processes to keep the dict in sync. reader could return prediction and its key instead. Then let the parent process build the result dictionary. This also lets you test all of the returned keys to make sure there are no duplicates.

    from itertools import repeat
    from multiprocessing import Pool
    
    def reader(entry): #other args are constant
        #Some condition:
        prediction = min_d:=min(distances) + (mlc_data[min_d[1]],)
        return entry_pairs[' '.join(entry)], prediction
    
    outcomes = {}
    with Pool(3) as p:
        for key, prediction in p.starmap(reader, zip(input[0], repeat(outcomes))):
            if key in outcomes:
                print("ERROR, key duplicated:", key)
            else:
                outcomes[key] = prediction