Search code examples
pythondictionarymultiprocessingpool

python multiprocess using map, but with one sub-process running


I am new to python map() function to achieve parallel code.

def main_function(sample):
    # ......(only input file; calculations; and output file)

if __name__ == "__main__":
    list_sample_common = os.listdir('/lustre/scratch/Stat/s1155136154/ONT_Panel2')# WES,ONT_panel, Pacibo_Panel intersection.
    list_sample_Pacibo_normal = ['RMH12', 'RMH15','RMH20','RMH25','RMH3.','RMH7.','RMH9.']# normal people sample
    list_sample_ONT_cDNA_only = ['RM66T','RM68T','RM77T']
    sample = list_sample_common + list_sample_Pacibo_normal + list_sample_ONT_cDNA_only
    pool=Pool()
    pool.map(main_function,sample)
    pool.close()
    pool.join()

So when I first use it on the cluster, the sub-process are run with 500% CPU (since I apply 5 cores in the cluster).

However, after some time, there is only one core running:
top result

So, the reason for that is the main function containing the output and input operations? And due to the main process only passing the short list to the sub function, I am sure the parameter size won't influence the speed.


Solution

  • This is only an educated guess since I do not know enough about the size of sample and the details of the work being performed by your worker function, main_function

    Lets assume that the iterable, sample, that you are passing to the Pool.map method has length 70 and as you said your pool size is 5. The map method will break up the 70 tasks into chunksize-sized groups of tasks distributing these chunks to each of the 5 processes in the pool. If you do not specify the chunksize argument to the map method it computes the value based on the size of the iterable (70) and the size of the pool (5) as follows:

    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, pool_size * 4)
        if remainder:
            chunksize += 1
        return chunksize
    

    So for your values, chunksize will be 4. So there will be 17 chunks of tasks of size 4 and a smaller 18th chunk of size 2 distributed among the 5 processes (each column is a queue of tasks for a given process in the pool):

    4 4 4 4 4
    4 4 4 4 4
    4 4 4 4 4
    4 4 2
    

    Assuming all tasks took an equal time to process, you could see that after a certain amount of time the last 2 processes would have completed the 12 tasks given to them and would now be idle and you would be running at only 60%. Eventually the third process will complete its tasks and and you will be now running at 40%.

    But you can see for the right combination of sample size and pool size, you could have a situation where you will only be running one process. This is exacerbated with large chunksize values, which are meant to reduce the number of shared memory accesses required to queue up tasks but can lead to some inefficiencies in CPU utilization.

    As an experiment, try re-running your program explicitly specifying a chunksize argument of 1 for your map invocation. Unless the number of tasks is a multiple of your pool size and every task takes the same amount of time to complete, even then you cannot expect every processor to have a task to run. In fact, it would be rare to have a situation where you had something other than only one process left running a final task. But this should reduce the percentage of time that only one processor is busy. But using a chunksize of 1 is considered inefficient for large iterables.

    Demo With Pool of 4 Processes Where the First Process Gets All the Long Running Tasks

    Here 16 tasks are submitted with a chunksize of 4 to a pool size of 4 so that the first process gets the first 4 tasks to run and these are artificially made to be 10 times longer running than the rest. We return an identifier associated with the sub-process to demonstrate that one particular process is processing the first 4 tasks:

    from multiprocessing import Pool, current_process
    import re
    import time
    
    def get_id():
        m = re.search(r'SpawnPoolWorker-(\d+)', str(current_process()))
        return int(m[1])
    
    def worker(i):
        R = 10000000
        id = get_id()
        t = time.time()
        # run up the cpu:
        cnt = 0
        for _ in range(R * 10 if i <= 3 else R):
            cnt += 1
        return i, id, time.time() - t
    
    
    
    if __name__ == '__main__':
        p = Pool(4)
        # 4 tasks per process:
        results = p.map(worker, range(16), chunksize=4) # first process gets arguments: 0, 1, 2, 3
        for result in results:
            i, id, elapsed_time = result
            print(f'i={i}, process id={id}, elapsed time={elapsed_time}')
    

    Prints:

    i=0, process id=1, elapsed time=6.197998046875
    i=1, process id=1, elapsed time=5.889002323150635
    i=2, process id=1, elapsed time=5.952000856399536
    i=3, process id=1, elapsed time=6.022995948791504
    i=4, process id=2, elapsed time=0.6909992694854736
    i=5, process id=2, elapsed time=0.8339993953704834
    i=6, process id=2, elapsed time=0.5869994163513184
    i=7, process id=2, elapsed time=0.7560005187988281
    i=8, process id=3, elapsed time=0.7500002384185791
    i=9, process id=3, elapsed time=0.7440023422241211
    i=10, process id=3, elapsed time=0.7600002288818359
    i=11, process id=3, elapsed time=0.7479968070983887
    i=12, process id=4, elapsed time=0.7950015068054199
    i=13, process id=4, elapsed time=0.7909986972808838
    i=14, process id=4, elapsed time=0.8639986515045166
    i=15, process id=4, elapsed time=0.7230024337768555
    

    Important Note: I may have said something was a simplification of what really occurs. There is a single input queue of tasks. Tasks are placed on this queue in chunks of chunksize groups and a process in the pool when it is idle takes the next chunksize group off the queue to process. I implied in my diagram that these chunks were pre-dealt out to all the processes at the start, but that is not necessarily the case. In my demo above I chose a chunksize that essentially caused all the chunks to be dealt out (the default chunksize would have been 1 if not specified). But sometimes it is even possible for the first process to grab all the chunks if the processing of the tasks is trivial (e.g. just a return None statement), which was not the case in the above demo. The implication of having a single queue with all the chunks is that when the chunksize is 1, a processor should never be unnecessarily idle.