I have a column in a data frame which consists of a list of item, I want to calculate the similarity of rows of that data frame (that will be list in this case) with all other rows based on Fisher exact test. For this purpose, I want to use Pool from python multiprocessing but it seems to be taking approx. same time as the traditional method (i.e using nested for-loop). Is there any way by which I can optimise the code?
Fisher test
def fisher_test(a, b, c, d):
# do some stuff and return p value
Computation using nested for loop:
%%time
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
universeSize = 13000
# gq_result_df is a data frame
for i, row in gq_result_df.iterrows():
for j in range(i, gq_result_df.shape[0]):
if(i==j):
continue
pval = fisher_test(row["module_genes"], gq_result_df.loc[j,"module_genes"], universeSize)
# pval_matrix is a matrix in which we are storing the result
pval_matrix[i,j] = pval
using Pool to parallelize the inner loop :
%%time
universeSize = 13000
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for i, row in range(0, gq_result_df.shape[0]):
pval = [pool.apply(fisher_test, args = (row["module_genes"],
gq_result_df.loc[j,"module_genes"], universeSize)) for j in range(i+1, gq_result_df.shape[0])]
#print("pval:", pval)
for j in range(i +1, fish_pval_mat.shape[0]):
pval_matrix[i, j] = pval[j -i -1]
pool.close()
pool.join()
Computation time when I run outer loop for 119 times
How can I optimise the parallelization code to reduce the time? Thanks in Advance
Your problem is the use of Pool.apply()
as it is a blocking call. Hence your execution is not parallel but sequential. Pool.apply() blocks until a result is available, making this just another implementation of the nested loop you mentioned. You submit one chunk to a subprocess, wait for it to be processed and then submit another - instead of submitting them all on one go.
I am not familiar with this particular algorithm and not sure if you can parallelise it - i.e. are chunks to be processed independent or do results of previous chunks affect the consecutive tasks, in which case this does not parallelise.
If it does parallelise, you could try apply_async()
instead. If you do this, then the interface changes a bit as your pval
is no longer a list of results but a list of AsyncResult
objects, and you need to loop through these and get()
the actual result from your worker.