Search code examples
pythonnumpymultiprocessingpool

Using pool for multiprocessing in Python (Windows)


I have to do my study in a parallel way to run it much faster. I am new to multiprocessing library in python, and could not yet make it run successfully. Here, I am investigating if each pair of (origin, target) remains at certain locations between various frames of my study. Several points:

  1. It is one function, which I want to run faster (It is not several processes).
  2. The process is performed subsequently; it means that each frame is compared with the previous one.
  3. This code is a very simpler form of the original code. The code outputs a residece_list.
  4. I am using Windows OS.

Can someone check the code (the multiprocessing section) and help me improve it to make it work. Thanks.

import numpy as np
from multiprocessing import Pool, freeze_support


def Main_Residence(total_frames, origin_list, target_list):
    Previous_List = {}
    residence_list = []

    for frame in range(total_frames):     #Each frame

        Current_List = {}               #Dict of pair and their residence for frames
        for origin in range(origin_list):

            for target in range(target_list):
                Pair = (origin, target)         #Eahc pair

                if Pair in Current_List.keys():     #If already considered, continue
                    continue
                else:
                    if origin == target:
                        if (Pair in Previous_List.keys()):            #If remained from the previous frame, add residence
                            print "Origin_Target remained: ", Pair
                            Current_List[Pair] = (Previous_List[Pair] + 1)
                        else:                                           #If new, add it to the current
                            Current_List[Pair] = 1

        for pair in Previous_List.keys():                        #Add those that exited from residence to the list
            if pair not in Current_List.keys():
                residence_list.append(Previous_List[pair])

        Previous_List = Current_List
    return residence_list

if __name__ == '__main__':
    pool = Pool(processes=5)
    Residence_List = pool.apply_async(Main_Residence, args=(20, 50, 50))
    print Residence_List.get(timeout=1)
    pool.close()
    pool.join()
    freeze_support()

Residence_List = np.array(Residence_List) * 5

Solution

  • Multiprocessing does not make sense in the context you are presenting here. You are creating five subprocesses (and three threads belonging to the pool, managing workers, tasks and results) to execute one function once. All of this is coming at a cost, both in system resources and execution time, while four of your worker processes don't do anything at all. Multiprocessing does not speed up the execution of a function. The code in your specific example will always be slower than plainly executing Main_Residence(20, 50, 50) in the main process.

    For multiprocessing to make sense in such a context, your work at hand would need to be broken down to a set of homogenous tasks that can be processed in parallel with their results potentially being merged later.

    As an example (not necessarily a good one), if you want to calculate the largest prime factors for a sequence of numbers, you can delegate the task of calculating that factor for any specific number to a worker in a pool. Several workers would then do these individual calculations in parallel:

    def largest_prime_factor(n):
        p = n
        i = 2
        while i * i <= n:
            if n % i:
                i += 1
            else:
                n //= i
        return p, n
    
    
    if __name__ == '__main__':
        pool = Pool(processes=3)
        start = datetime.now()
        # this delegates half a million individual tasks to the pool, i.e. 
        # largest_prime_factor(0), largest_prime_factor(1), ..., largest_prime_factor(499999)      
        pool.map(largest_prime_factor, range(500000))
        pool.close()
        pool.join()
        print "pool elapsed", datetime.now() - start
        start = datetime.now()
        # same work just in the main process
        [largest_prime_factor(i) for i in range(500000)]
        print "single elapsed", datetime.now() - start
    

    Output:

    pool elapsed 0:00:04.664000
    single elapsed 0:00:08.939000
    

    (the largest_prime_factor function is taken from @Stefan in this answer)

    As you can see, the pool is only roughly twice as fast as single process execution of the same amount of work, all while running in three processes in parallel. That's due to the overhead introduced by multiprocessing/the pool.

    So, you stated that the code in your example has been simplified. You'll have to analyse your original code to see if it can be broken down to homogenous tasks that can be passed down to your pool for processing. If that is possible, using multiprocessing might help you speed up your program. If not, multiprocessing will likely cost you time, rather than save it.

    Edit:
    Since you asked for suggestions on the code. I can hardly say anything about your function. You said yourself that it is just a simplified example to provide an MCVE (much appreciated by the way! Most people don't take the time to strip down their code to its bare minimum). Requests for a code review are anyway better suited over at Codereview.

    Play around a bit with the available methods of task delegation. In my prime factor example, using apply_async came with a massive penalty. Execution time increased ninefold, compared to using map. But my example is using just a simple iterable, yours needs three arguments per task. This could be a case for starmap, but that is only available as of Python 3.3.
    Anyway, the structure/nature of your task data basically determines the correct method to use.
    I did some q&d testing with multiprocessing your example function. The input was defined like this:

    inp = [(20, 50, 50)] * 5000  # that makes 5000 tasks against your Main_Residence
    

    I ran that in Python 3.6 in three subprocesses with your function unaltered, except for the removal of the print statment (I/O is costly). I used, starmap, apply, starmap_async and apply_async and also iterated through the results each time to account for the blocking get() on the async results.
    Here's the output:

    starmap elapsed 0:01:14.506600
    apply elapsed 0:02:11.290600
    starmap async elapsed 0:01:27.718800
    apply async elapsed 0:01:12.571200
    # btw: 5k calls to Main_Residence in the main process looks as bad 
    # as using apply for delegation
    single elapsed 0:02:12.476800
    

    As you can see, the execution times differ, although all four methods do the same amount of work; the apply_async you picked appears to be the fastest method.

    Coding Style. Your code looks quite ... unconventional :) You use Capitalized_Words_With_Underscore for your names (both, function and variable names), that's pretty much a no-no in Python. Also, assigning the name Previous_List to a dictionary is ... questionable. Have a look at PEP 8, especially the section Naming Conventions to see the commonly accepted coding style for Python.

    Judging by the way your print looks, you are still using Python 2. I know that in corporate or institutional environments that's sometimes all you have available. Still, keep in mind that the clock for Python 2 is ticking