Search code examples
pythonmultiprocessingpython-multiprocessingconcurrent.futuresabaqus

Using Concurrent.Futures.ProcessPoolExecutor to run simultaneous & independents ABAQUS models


I wish to run a total of nAnalysis=25 Abaqus models, each using X number of Cores, and I can run concurrently nParallelLoops=5 of these models. If one of the current 5 analysis finishes, then another analysis should start until all nAnalysis are completed.

I implemented the code below based on the solutions posted in 1 and 2. However, I am missing something because all nAnalysis try to start at "once", the code deadlocks and no analysis ever completes since many of then may want to use the same Cores than an already started analysis is using.

  1. Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
  2. How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):    
    import subprocess
    import os

    inpFile,path,jobVars = args

    prcStr1 = (path+'/runJob.sh')

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)

def safeABQrun(*args):
    import os

    try:
        runABQfile(*args)
    except Exception as e:
        print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis))  # 5Nodes
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

The only way up to now I am able to run that is if I modify the errFunction to use exactly 5 analysis at the time as below. However, this approach results sometimes in one of the analysis taking much longer than the other 4 in every group (every ProcessPoolExecutor call) and therefore the next group of 5 won't start despite the availability of resources (Cores). Ultimately this results in more time to complete all 25 models.

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait    

    # Group 1
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 2
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 3
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 4
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

I tried using the as_completed function but it seems not to work either.

Please can you help figuring out the proper parallelization so I can run a nAnalysis, with always nParallelLoops running concurrently? Your help is appreciated it. I am using Python 2.7

Bests, David P.


UPDATE JULY 30/2016:

I introduced a loop in the safeABQrun and that managed the 5 different "queues". The loop is necessary to avoid the case of an analysis trying to run in a node while another one is still running. The analysis are pre-configured to run in one of the requested nodes before starting any actual analysis.

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

Solution

  • It looks OK to me, but I can't run your code as-is. How about trying something vastly simpler, then add things to it until "a problem" appears? For example, does the following show the kind of behavior you want? It does on my machine, but I'm running Python 3.5.2. You say you're running 2.7, but concurrent.futures didn't exist in Python 2 - so if you are using 2.7, you must be running someone's backport of the library, and perhaps the problem is in that. Trying the following should help to answer whether that's the case:

    from concurrent.futures import ProcessPoolExecutor, wait, as_completed
    
    def worker(i):
        from time import sleep
        from random import randrange
        s = randrange(1, 10)
        print("%d started and sleeping for %d" % (i, s))
        sleep(s)
    
    if __name__ == "__main__":
        nAnalysis = 25
        nParallelLoops = 5
        with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
            futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
            for f in as_completed(futures):
                print("got %d" % futures[f])
    

    Typical output:

    0 started and sleeping for 4
    1 started and sleeping for 1
    2 started and sleeping for 1
    3 started and sleeping for 6
    4 started and sleeping for 5
    5 started and sleeping for 9
    got 1
    6 started and sleeping for 5
    got 2
    7 started and sleeping for 6
    got 0
    8 started and sleeping for 6
    got 4
    9 started and sleeping for 8
    got 6
    10 started and sleeping for 9
    got 3
    11 started and sleeping for 6
    got 7
    12 started and sleeping for 9
    got 5
    ...