Search code examples
pythonmultiprocessingpython-multiprocessing

Performance Issue with Pool method of multiprocessing


I have a column in a data frame which consists of a list of item, I want to calculate the similarity of rows of that data frame (that will be list in this case) with all other rows based on Fisher exact test. For this purpose, I want to use Pool from python multiprocessing but it seems to be taking approx. same time as the traditional method (i.e using nested for-loop). Is there any way by which I can optimise the code?

Fisher test

def fisher_test(a, b, c, d):
    # do some stuff and return p value

Computation using nested for loop:

%%time
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())


universeSize = 13000
# gq_result_df is a data frame
for i, row in gq_result_df.iterrows():
    for j in range(i, gq_result_df.shape[0]):
        if(i==j):
            continue
        pval = fisher_test(row["module_genes"], gq_result_df.loc[j,"module_genes"], universeSize)
        # pval_matrix is a matrix in which we are storing the result
        pval_matrix[i,j] = pval

using Pool to parallelize the inner loop :

%%time

universeSize = 13000

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

for i, row in range(0, gq_result_df.shape[0]):

    pval = [pool.apply(fisher_test, args = (row["module_genes"], 
                                                         gq_result_df.loc[j,"module_genes"], universeSize)) for j in range(i+1, gq_result_df.shape[0])]
    #print("pval:", pval)
    for j in range(i +1, fish_pval_mat.shape[0]):
        pval_matrix[i, j] = pval[j -i -1]

pool.close()
pool.join()

Computation time when I run outer loop for 119 times

  1. Without parallelization: 13 min
  2. With the parallelization (using Pool): 12 min

How can I optimise the parallelization code to reduce the time? Thanks in Advance


Solution

  • Your problem is the use of Pool.apply() as it is a blocking call. Hence your execution is not parallel but sequential. Pool.apply() blocks until a result is available, making this just another implementation of the nested loop you mentioned. You submit one chunk to a subprocess, wait for it to be processed and then submit another - instead of submitting them all on one go.

    I am not familiar with this particular algorithm and not sure if you can parallelise it - i.e. are chunks to be processed independent or do results of previous chunks affect the consecutive tasks, in which case this does not parallelise.

    If it does parallelise, you could try apply_async() instead. If you do this, then the interface changes a bit as your pval is no longer a list of results but a list of AsyncResult objects, and you need to loop through these and get() the actual result from your worker.