Search code examples
python-3.xmultiprocessingpool

python multiprocessing .get() never ends


I come here because I have an issue with multiprocessing in my python script. my code is :

def filter_list_of_list_values(myList,myFilterList):
    for index in range(len(myList)):
        print(index)
        sub_array = myList[index]
        for stopword in myFilterList :
            sub_array = list(filter(lambda a: a != stopword, sub_array))
        sub_array = [w for w in sub_array if not w in myFilterList]
        myList[index] = sub_array
    return myList  

import multiprocessing
import numpy as np
#We are going to execute a multiprocessing and split the list in as many parts than processors used :
N_PROCS = 6
#Lists splitting : 
L_sub_lists  = np.array_split(tokenized_text, N_PROCS)


final_List = []
start_time = time.time()


print('Creating pool with %d processes\n' % N_PROCS)
with multiprocessing.Pool(N_PROCS) as pool:
    #We initialize a list of tasks which each call the same function, but
    #with a diffrent list
    TASKS = [(sub_list, english_stopwords) for sub_list in L_sub_lists]
    print("TASK OK")

    results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
    print("results OK")
    
    final_results = [r.get() for r in results]
    print("final_results OK")

    for sub_list_res in final_results:
        print("appending")
        final_List+= sub_list_res
        print("list_append")
    
print("--- %s seconds ---" % (time.time() - start_time))

The script is stucked at :

final_results = [r.get() for r in results]

I really don't understand why because I used the same script (with some small differences) with an other context (different function and applied to a DataFrame rather than a list of lists) and everything worked very well

An example :

L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
 ['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
 ['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]

filter_list_of_list_values(L,['Lyon','Turin','Chicago'])

will result in :

[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]

Solution

  • So, I tried to take a look at this, and it looks like your code is almost right. You're missing how you actually form your input though. Your example code fails because you don't have time imported, and you don't have tokenized_text defined, and I have no idea what the input is actually supposed to be. BUT! Based on your examples your code does work so I'm suspecting that what ever you're doing to form the input it's incorrect

    Here is a basically functional version of your code

    import time
    import multiprocessing
    import numpy as np
    
    N_PROCS = 6
    # L_sub_lists = np.array_split(tokenized_text, N_PROCS)
    
    final_List = []
    start_time = time.time()
    
    L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
         ['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
         ['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]
    
    filter_list = ['Lyon', 'Turin', 'Chicago']
    
    
    def filter_list_of_list_values(myList, myFilterList):
        for index in range(len(myList)):
            sub_array = myList[index]
            for stop_word in myFilterList:
                sub_array = list(filter(lambda a: a != stop_word, sub_array))
            sub_array = [w for w in sub_array if w not in myFilterList]
            myList[index] = sub_array
        return myList  
    
    
    print(filter_list_of_list_values(L, filter_list))
    
    print('Creating pool with %d processes\n' % N_PROCS)
    with multiprocessing.Pool(N_PROCS) as pool:
        TASKS = [([sub_list], filter_list) for sub_list in L]
        print("TASK OK")
    
        results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
        print("results OK")
    
        print("Getting final results")
        final_results = [r.get() for r in results]
        print("final_results OK")
    
    print("Printing final_results %s" % final_results)
    print("--- %s seconds ---" % (time.time() - start_time))
    

    It basically just breaks up your bigger list of lists into smaller ones and processes those in sub procs. I test you main function once before hand to validate the output you expect and the results to verify that the distributed processing returns the same. I think that is the point though I'm not sure because the question is not clear other than "This code doesn't work" and "I expect these outputs"

    Here is the script output:

    [['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]
    Creating pool with 6 processes
    
    TASK OK
    results OK
    Getting final results
    final_results OK
    Printing final_results [[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille']], [['New-York', 'NapleWashington', 'Las Vegas']], [['Rome', 'Venise', 'Naple']]]