I am new to python map()
function to achieve parallel code.
def main_function(sample):
# ......(only input file; calculations; and output file)
if __name__ == "__main__":
list_sample_common = os.listdir('/lustre/scratch/Stat/s1155136154/ONT_Panel2')# WES,ONT_panel, Pacibo_Panel intersection.
list_sample_Pacibo_normal = ['RMH12', 'RMH15','RMH20','RMH25','RMH3.','RMH7.','RMH9.']# normal people sample
list_sample_ONT_cDNA_only = ['RM66T','RM68T','RM77T']
sample = list_sample_common + list_sample_Pacibo_normal + list_sample_ONT_cDNA_only
pool=Pool()
pool.map(main_function,sample)
pool.close()
pool.join()
So when I first use it on the cluster, the sub-process are run with 500% CPU (since I apply 5 cores in the cluster).
However, after some time, there is only one core running:
So, the reason for that is the main function containing the output and input operations? And due to the main process only passing the short list to the sub function, I am sure the parameter size won't influence the speed.
This is only an educated guess since I do not know enough about the size of sample
and the details of the work being performed by your worker function, main_function
Lets assume that the iterable, sample
, that you are passing to the Pool.map
method has length 70 and as you said your pool size is 5. The map
method will break up the 70 tasks into chunksize
-sized groups of tasks distributing these chunks to each of the 5 processes in the pool. If you do not specify the chunksize
argument to the map
method it computes the value based on the size of the iterable (70) and the size of the pool (5) as follows:
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
So for your values, chunksize
will be 4. So there will be 17 chunks of tasks of size 4 and a smaller 18th chunk of size 2 distributed among the 5 processes (each column is a queue of tasks for a given process in the pool):
4 4 4 4 4
4 4 4 4 4
4 4 4 4 4
4 4 2
Assuming all tasks took an equal time to process, you could see that after a certain amount of time the last 2 processes would have completed the 12 tasks given to them and would now be idle and you would be running at only 60%. Eventually the third process will complete its tasks and and you will be now running at 40%.
But you can see for the right combination of sample
size and pool size, you could have a situation where you will only be running one process. This is exacerbated with large chunksize
values, which are meant to reduce the number of shared memory accesses required to queue up tasks but can lead to some inefficiencies in CPU utilization.
As an experiment, try re-running your program explicitly specifying a chunksize
argument of 1 for your map
invocation. Unless the number of tasks is a multiple of your pool size and every task takes the same amount of time to complete, even then you cannot expect every processor to have a task to run. In fact, it would be rare to have a situation where you had something other than only one process left running a final task. But this should reduce the percentage of time that only one processor is busy. But using a chunksize
of 1 is considered inefficient for large iterables.
Demo With Pool of 4 Processes Where the First Process Gets All the Long Running Tasks
Here 16 tasks are submitted with a chunksize
of 4 to a pool size of 4 so that the first process gets the first 4 tasks to run and these are artificially made to be 10 times longer running than the rest. We return an identifier associated with the sub-process to demonstrate that one particular process is processing the first 4 tasks:
from multiprocessing import Pool, current_process
import re
import time
def get_id():
m = re.search(r'SpawnPoolWorker-(\d+)', str(current_process()))
return int(m[1])
def worker(i):
R = 10000000
id = get_id()
t = time.time()
# run up the cpu:
cnt = 0
for _ in range(R * 10 if i <= 3 else R):
cnt += 1
return i, id, time.time() - t
if __name__ == '__main__':
p = Pool(4)
# 4 tasks per process:
results = p.map(worker, range(16), chunksize=4) # first process gets arguments: 0, 1, 2, 3
for result in results:
i, id, elapsed_time = result
print(f'i={i}, process id={id}, elapsed time={elapsed_time}')
Prints:
i=0, process id=1, elapsed time=6.197998046875
i=1, process id=1, elapsed time=5.889002323150635
i=2, process id=1, elapsed time=5.952000856399536
i=3, process id=1, elapsed time=6.022995948791504
i=4, process id=2, elapsed time=0.6909992694854736
i=5, process id=2, elapsed time=0.8339993953704834
i=6, process id=2, elapsed time=0.5869994163513184
i=7, process id=2, elapsed time=0.7560005187988281
i=8, process id=3, elapsed time=0.7500002384185791
i=9, process id=3, elapsed time=0.7440023422241211
i=10, process id=3, elapsed time=0.7600002288818359
i=11, process id=3, elapsed time=0.7479968070983887
i=12, process id=4, elapsed time=0.7950015068054199
i=13, process id=4, elapsed time=0.7909986972808838
i=14, process id=4, elapsed time=0.8639986515045166
i=15, process id=4, elapsed time=0.7230024337768555
Important Note: I may have said something was a simplification of what really occurs. There is a single input queue of tasks. Tasks are placed on this queue in chunks of chunksize
groups and a process in the pool when it is idle takes the next chunksize
group off the queue to process. I implied in my diagram that these chunks were pre-dealt out to all the processes at the start, but that is not necessarily the case. In my demo above I chose a chunksize
that essentially caused all the chunks to be dealt out (the default chunksize
would have been 1 if not specified). But sometimes it is even possible for the first process to grab all the chunks if the processing of the tasks is trivial (e.g. just a return None
statement), which was not the case in the above demo. The implication of having a single queue with all the chunks is that when the chunksize
is 1, a processor should never be unnecessarily idle.