Search code examples
pythonparallel-processingpython-multiprocessingpython-multithreadingjoblib

Bug or feature? Unable to do two consecutive steps of multiprocessing in a python script


I have two consecutive functions that process large lists.

I call one after another, using joblib's Parallel, delayed, in an attempt to increase a processing speed for both functions individually.

However, I am seeing an output from function_1 also as soon as Parallel calls function_2 and I don't understand why. In a nutshell, this leads to a function_2 not being called.

The main code:

from mycode import function_2
from joblib import Parallel, delayed
import gc

if __name__ == '__main__':  
   list = list_1
   print ">>> First call"
   Parallel(n_jobs = -1)(delayed(function_1) 
                                         (item) for item in list)
   gc.collect()
   do_other_stuff()
   list = list_2
   print ">>> Second call"
   Parallel(n_jobs=-1, backend='threading')(delayed(function_2)
                                         (item) for item in list)

Threaded functions:

def function_1(): # Gets called first
    print "this comes from function 1"
    pass

def function_2(): # Gets called second
    print "this comes from function 2"
    pass

Output:

>>> First call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1
>>> Second call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1

My hypothesis is that there are some parts of function_1 stored in a memory, which is retained after calling it ( possibly due to a joblib memory mapping / sharing feature? ).

This is why I gc.collect() between the calls. Since this doesn't help, I think about reloading modules between calls ( joblib, Parallel, delayed ), which seems ugly.

Did anyone experience similar behavior (on windows)?

Is there some fix?

Do I need to un/reload joblib or mycode modules here, between Parallel steps and if so, why?


Solution

  • I had the same issue.

    My code looked like:

    A = Parallel(n_jobs=1)(delayed(self.function_1)( df_1, item ) for item in list_of_items)
    
    B = Parallel(n_jobs=1)(delayed(self.function_2)( df_2, item ) for item in list_of_items)
    

    where "list_of_items" variable had 2 items.

    But the output was...

    [Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:   32.2s finished
    [Parallel(n_jobs=1)]: Done   0 out of   0 | elapsed:    0.0s finished
    

    The reason why the second Parallel process didn't run (at least in my case) was because my "list_of_items" was a generator instead of a list!

    I hope this solves your problem too.. :)