Search code examples
pythonperformancemultiprocessingbatch-processingjobs

How to keep n processes running on a list of jobs that vary in length in Python 2.7?


so a little background on what I am trying to do: Below I have written some code that will create jobs in batches of 4 and run them. These jobs involve a lot of data so I want to limit how many are running at once. Based on the contents of the data these jobs can vary in execution time from about 2 - 15 mins. Below I have written code to run these jobs in batches of 4. This much is working.

Current Code execution

Desired Code execution

Again, these jobs vary greatly in how long they take to run. So this makes it seems like running jobs in batches of 4 (or n) could result in time wasted. For example given 4 jobs if 3 take 2 minutes and the other takes 10 then there will be 8 minutes of only one process running.

So my question: Is there a way to supply a list of jobs to some function/class/whatever so that I can keep the number of processes and jobs running constant?

        manager = multiprocessing.Manager()
        return_dict = manager.dict()

        jobs = []

        numOFProc = 4

        for api in WellAPIs: #iterate over individual wells in a file

            p = multiprocessing.Process(target=self.processWell, args=(df,CreatingPrediction,cache,df, newColList, clustringPredictionColName,return_dict,lock))
            jobs.append(p)

            p.start()

            numOfActiveProc = len(jobs)

            if numOFProc <= numOfActiveProc:
                for proc in jobs:
                    proc.join()
                jobs = []
            print "jobs: " + str(jobs)


        for proc in jobs:
            proc.join()
        jobs = []

        for parDF in return_dict.values():
            outDf = outDf.append(parDF)

Solution

  • You're probably looking for Pool

    from multiprocessing import Pool
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        p = Pool(5)   # NUMBER OF PROCESSES
        print(p.map(f, [1, 2, 3]))   # APPLIES `f` TO EACH ELEMENT
        # APPEND THIS TO MY TODO LIST AND PRINT IT WHEN IT'S DONE
        p.apply_async(f, (10,), callback=print)
        print(p.apply_async(f, (10,)).get())
    

    You can do a Pool(4), then use map with any iterable, when the iterable is consumed, the function stops. Alternatively, you can use apply async, which uses either a callback or .get()