Search code examples
pythonmultithreadingmultiprocessingthreadpool

Python multiprocessing - apply_async not working


I am trying to parallelize a code with a ThreadPool. I am currently working on windows. Basically, the behavior that I am getting is that when I call apply_async nothing happens. My program just print START and END.

Below there is an example:

import glob
import itertools
import pandas as pd
from multiprocessing.dummy import Pool as ThreadPool 


def ppp(window,day):
    print(window,day)


#%% Reading datasets
print('START')
tree = pd.read_csv('datan\\days.csv')
days = list(tree.columns)
windows = [2000]
processes_args = list(itertools.product(windows, days))


pool = ThreadPool(8) 
results = pool.apply_async(ppp, processes_args)
pool.close() 
pool.join() 
print('END')

There are many questions on stack that suggest calling other methods, like imap_unordered, map, apply. However, none of them solve the problem.

Edit:

results.get()

returns an error about the number of parameters:

TypeError: ppp() takes 2 positional arguments but 10 were given

However, the documentation states that I can use a list of tuples for passing parameters, otherwise how can I pass them?

Edit2:

processes_args look likes the output below before calling apply_async:

[(2000, '0808'),
 (2000, '0810'),
 (2000, '0812'),
 (2000, '0813'),
 (2000, '0814'),
 (2000, '0817'),
 (2000, '0818'),
 (2000, '0827'),
 (2000, '0828'),
 (2000, '0829')]

Solution

  • Positional parameters in Pool.apply and Pool.apply_async are expanded using the * unpacking syntax.

    According to processed_args content, your ppp function would receive 10 tuples when scheduled via apply_async.

    If you want to process an iterable, I'd recommend you to use Pool.map or Pool.map_async. The map functions do not expand the arguments within the iterable. You need to take care of it yourself.

    def ppp(element):
        window, day = element
        print(window, day)
    
    pool.map(ppp, processed_args)
    

    If you want to keep the ppp function as is, you can use Pool.starmap which applies argument expansion on the iterator content.