Search code examples
pythonmultithreadingparallel-processingmultiprocessingparallelism-amdahl

How to run a parallel function inside a serial one in Python?


Maybe this is really simple, but I am having a bit of a problem understanding this.

The challenge I have is to execute a child parallel function from inside a mother function. That mother function should run only once while waiting for the results of the child parallel function calls.

I wrote a little example which shows my dilemma.

import string
from joblib import Parallel, delayed
import multiprocessing

def jobToDoById(id):
    #do some other logic based on the ID given
    rand_str  = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
    return [id, rand_str]


def childFunctionParallel(jobs):
    num_cores = multiprocessing.cpu_count()
    num_cores = num_cores - 1

    if __name__ == '__main__':
        p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
        return p

def childFunctionSerial(jobs):
    result = []
    for job in jobs:
        job_result = jobToDoById(job)
        result.append(job_result)
    return result



def motherFunction(countries_cities, doInParallel):
    result = []
    print("Start mainLogic")
    for country in countries_cities:
        city_list = countries_cities[country]
        if(doInParallel):
            cities_result = childFunctionParallel(city_list)
        else:
            cities_result = childFunctionSerial(city_list)
        result.append(cities_result)
        # ..... do some more logic

    # ..... do some more logic before returning
    print("End mainLogic")
    return result



print("Start Program")

countries_cities = {
    "United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
    "United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
    "France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
    "Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
    "Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother) 
print("End Program")

If you toggle the doInParallel between True and False then you can see the problem. When running with the childFunctionSerial() the motherFunction() runs only once. But when we run with the childFunctionParallel then the motherFunction() is executed multiple times. Both give the same result but the problem I have is that motherFunction() should be executed only once.

Two questions:

1. How to restructure the program so that we executed the mother function once
and from inside it, start a parallel job without running multiple instances of the same mother function?
2. How can I pass a second parameter to the jobToDoById() besides the id?


Solution

  • Ad 2: Put additional parameter(s) into a tuple & pass ( id, .., )

    This one was simple and is commonly used, so one can meet it in many examples.

    def jobToDoById( aTupleOfPARAMs = ( -1, ) ): # jobToDoById(id):
        #                                        #    do some other logic based on the ID given
        if not type( aTupleOfPARAMs ) is tuple:  # FUSE PROTECTION
           return [-1, "call interface violated"]
        if aTupleOfPARAMs[0] == -1:              # FUSE PROTECTION
           return [-1, None]
        # .......................................# GO GET PROCESSED:
        rand_str  = ''.join( random.choice( string.ascii_lowercase
                                          + string.ascii_uppercase
                                          + string.digits
                                            )
                                      for i in range( 10 )
                             )
        return [id, rand_str]
    

    The first question is a bit harder, yet way more interesting as system-design's principal differences among [SERIAL], "just"-[CONCURRENT] and true-[PARALLEL] system-scheduling policies of more than one processes are not always respected in popular media ( and sometimes even not in the Academia ).


    Ad 1: you might get surprised, this will never happen in current version

    Your code mentioned joblib.Parallel and multiprocessing modules explicitly, yet documentation says:

    By default Parallel uses the Python multiprocessing module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.

    There are two implications - your processing will pay a dual, [TIME]-domain and [SPACE]-domain overhead costs, that may easily become unacceptably huge OVERHEAD COSTS ( and if one has already noticed also the words "data" and "serialized" in the citation above, the better ) - for details see re-formulated Amdahl's Law, as detailed in Section: Criticism et al :

    1. The whole Python interpreter including it's data and internal state is fully forked ( so you get as many copies as instructed, each running just one process-flow, which is made for the sake of not loosing performance on a GIL-round-robin fragmentation / Only-1-runs-All-Others-have-to-wait type of GIL-blocking / stepping present any 1+ processing-flow if made in threading-based pools etc. )

    2. Besides all the complete Python interpreter + state re-instantiations that have to take place as noted above, also ALL <data-IN> + <data-OUT> are:

      ----------------------------MAIN-starts-to-escape-from-pure-[SERIAL]-processing-- 0: MAIN forks self 1 2 ... [n_jobs] - as many copies of self as requested -------------------------MAIN-can-continue-in-"just"-[CONCURRENT]-after: 1st-Data-IN-SERialised-in-MAIN's-"main"

      • 2nd-Data-IN-QUEueed in MAIN
      • 3rd-Data-IN-DEQueued [ith_job]s
      • 4th-Data-IN-DESerialised [ith_job]s
      • ( ...process operated the usefull [ith_job]s --planned... )
      • 5th-Data-OUT-SERialised [ith_job]s
      • 6th-Data-OUT-QUEued [ith_job]s
      • 7th-Data-OUT-DEQueued in-MAIN
      • 8th-Data-OUT-DESerialised-in-MAIN's-"main"
        -------------------------------MAIN-can-continue-in-pure-[SERIAL]-processing-----

    which all together always costs non-negligible overhead-time ( for equations and details, kindly Ref.: overhead-strict re-formulation of net-speedups achievable upon these add-on overhead costs, best before diving into a refactoring, where your machine will pay way more than what it gets from attempts to ignore these principal and benchmarkable overhead costs )

    For benchmarking these overhead costs, each separately, in microsecond measurements, tools are available ( yet not all Stack Overflow Members felt happy on doing quantitatively robust benchmarking of these ), just check other posts on here on Stack Overflow.

    The second principal limitation of joblib.Parallel implementation, that strucks, if not headbangs, into Amdahl's Law, is a resources-real-availability-agnostic optimism, while resources-state-aware scheduling is the thing that happens on each real-world system.

    One may expect any high-degree of parallel code execution, but unless complex measures are taken on an end-to-end ( top-to-bottom ) system coverage, all processing goes but into a "just"-[CONCURRENT] schedule ( i.e. if resources permit ). This aspect is way extending the footprint of this post, and was just naively put into the scheme above, showing, that if CPU-cores ( and principally any other resource-class ) are not available, the concurrency will never reach the levels of speedup, that a resources-availability agnostic original Amdahl's Law was promising.

    ----------------------------MAIN-starts-escape-from-processing---in-pure-[SERIAL]
      0:                        MAIN forks self                     -in-pure-[SERIAL]
                                     [1]                            -in-pure-[SERIAL]
                                     [2]                            -in-pure-[SERIAL]
                                     ...                            -in-pure-[SERIAL]
                                     [n_jobs] as many copies of self-in-pure-[SERIAL]
                                              as requested          -in-pure-[SERIAL]
      --------------------------MAIN-can-continue-in-"just"-[CONCURRENT]after[SERIAL]
    + 1st-Data-IN-SERialised-in-MAIN's-"__main__"  , job(2), .., job(n_jobs):[SERIAL]
    + 2nd-Data-IN-QEUueed    in MAIN for all job(1), job(2), .., job(n_jobs):[SERIAL]
    + 3rd-Data-IN-DEQueued              [ith_job]s:       "just"-[CONCURRENT]||X||X||
    + 4th-Data-IN-DESerialised          [ith_job]s:       "just"-[CONCURRENT]|X||X|||
    + ( ...process operated the usefull [ith_job]s-<The PAYLOAD>-planned... )||X|||X|
    + 5th-Data-OUT-SERialised           [ith_job]s:       "just"-[CONCURRENT]||||X|||
    + 6th-Data-OUT-QUEued               [ith_job]s:       "just"-[CONCURRENT]|X|X|X||
    + 7th-Data-OUT-DEQueued     in-MAIN <--l job(1), job(2), .., job(n_jobs):[SERIAL]
    + 8th-Data-OUT-DESerialised-in-MAIN's-"__main__" job(2), .., job(n_jobs):[SERIAL]
    -------------------------------MAIN-can-continue-processing------in-pure-[SERIAL]
    ...                                                             -in-pure-[SERIAL]