Search code examples
pythonparallel-processingpython-multiprocessing

Parallelization using multiprocessing running processes sequentially instead of simultaneously


I'm trying to parallelize a piece of code given below using the multiprocessing module. Everything I try leads to each child process being run one after the other even though they all have different PIDs. I have tried:

  1. CentOS and MacOS
  2. Context as spawn and as fork
  3. Using Queues and using pools
  4. Using Apply and Using map and their async versions
  5. Adding/removing pool.join() and Process.join()

I can't figure out what I am doing wrong.

fs.py:

import numpy as np
from time import sleep
import os

def f(r):
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(10)
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

playground.py:

import multiprocessing as mp
import numpy as np
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    p = ctx.Pool(4)
    with p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = [p.apply(f, (subset, )) for subset in subsets]
        print(res)

    print('Done!')

Command: python playground.py

Output:

I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]), 
  'dubs': array([ 6,  8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!

Solution

  • When I use p.map() like this (on Linux Mint)

    res = p.map(f, subsets)
    

    then I get

    I am 1337328
    I am 1337325
    I am 1337327
    I am 1337328 and I am finished
    I am 1337325 and I am finished
    I am 1337327 and I am finished
    

    Maybe you used map() in wrong way. res = [p.map(f, (subset, )) for subset in subsets]


    if __name__ == '__main__':
        ctx = mp.get_context('spawn')
        with ctx.Pool(4) as p:
            subsets = [[0, 3], [3, 6], [6, 7]]
            res = p.map(f, subsets)
            print(res)
            
        print('Done!')
    

    For apply_async you would need two for-loops

        items = [p.apply_async(f, (subset, )) for subset in subsets]
        res = [x.get() for x in items]
        print(res)
    

    And both have to be inside with p:


    if __name__ == '__main__':
        ctx = mp.get_context('spawn')
        with ctx.Pool(4) as p:
            subsets = [[0, 3], [3, 6], [6, 7]]
    
            items = [p.apply_async(f, (subset, )) for subset in subsets]
            print(items)
            
            res = [x.get() for x in items]
            print(res)
            
        print('Done!')