I would like to process a large amount of csv files stored in file_list
with a function called get_scores_dataframe
. This function takes a second argument phenotypes
stored in another list. The function then writes the result back to csv files. I managed to parallelize this task using the ProcessPoolExecutor()
and as such, it works.
with concurrent.futures.ProcessPoolExecutor() as executor:
phenotypes = [phenotype for i in range(len(file_list))]
futures = executor.map(get_scores_dataframe, file_list, phenotypes,
chunksize=25)
filenames = executor.map(os.path.basename, file_list)
for future, filename in zip(futures, filenames):
futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
index = False)
As you can see, I am using a context manager for this and within the context manager the method map()
where I can set the option chunksize
. However, I would like that the program writes the csv files as it finishes processing each dataframe. It appears that the context manager waits until all jobs are done and then writes the results to the csv files.
Do you have an idea how I can achieve this?
First, executor.map
does not return Future
instances, so your variable futures
is poorly named. It does return an iterator that yields the return values of applying get_scores_dataframe
to each element of file_list
in turn. Second, seeing how this is used next, it would appear that these return values are input files (which may or may not be the same file as the input argument -- can't be sure from the lack of code shown). Also, using the process pool map
function rather than the builtin map
function to get the base name of the filename arguments seems like overkill. Finally, in your code, it would not be futures.to_csv
, but rather future.to_csv
. So I am confused as to how your code could have worked at all.
If you modify your function get_scores_dataframe
to return a tuple consisting of a dataframe and the original passed filename argument, then we can process the results in completion order using as_competed
:
from concurrent.futures import as_completed
import multiprocessing
with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
for future in as_completed(futures):
# it is assumed return value is tuple: (data frame, original filename argument):
df, file = future.result()
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)
Now by using submit
you are losing the ability to "chunking" up job submissions. We can switch to using multiprocessing.Pool
with imap_unordered
. But imap_unordered
can only pass a single argument to the worker function. So, if you are able to modify your worker to change the order of the arguments, we can make phenotype
the first one and use a partial
(see manual):
import multiprocessing
from functools import partial
POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process
def compute_chunksize(iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
if extra:
chunksize += 1
return chunksize
with multiprocessing.Pool(POOL_SIZE) as pool:
chunksize = compute_chunksize(len(file_list))
worker = partial(get_scores_dataframe, phenotype)
# it is assumed that start_processing returns a tuple: (data frame, original filename argument)
for df, file in pool.imap_unordered(worker, file_list, chunksize):
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)