Search code examples
pythonsubprocessconcurrent.futuresprocess-pool

Python concurrent.futures using subprocess, running several python script


I want to run several python script at the same time using concurrent.futures. The serial version of my code go and look for a specific python file in folder and execute it.

import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf

FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern   = "Read.py"

for dir,_,_ in os.walk(start_dir):
    FileList.extend(glob(os.path.join(dir,pattern))) ;

FileList

i=0
for file in FileList:
    dir=os.path.dirname((file))
    dirname1 = os.path.basename(dir) 
    print(dirname1)
    i=i+1
    Str='python '+ file
    print(Str)
    completed_process = subprocess.run(Str)`

for the Parallel version of my code:

    def Python_callback(future):
    print(future.run_type, future.jid)
    return "One Folder finished executing"

def Python_execute():
    from concurrent.futures import ProcessPoolExecutor as Pool
    args = FileList
    pool = Pool(max_workers=1)
    future = pool.submit(subprocess.call, args, shell=1)
    future.run_type = "run_type"
    future.jid = FileList
    future.add_done_callback(Python_callback)
    print("Python executed")

if __name__ == '__main__':
    import subprocess
    Python_execute()

The issue is that I am not sure how to pass each element of the FileList to separate cpu

Thanks for your help in advance


Solution

  • The smallest change is to use submit once for each element, instead of once for the whole list:

    futures = []
    for file in FileList:
        future = pool.submit(subprocess.call, file, shell=1)
        future.blah blah
        futures.append(future)
    

    The futures list is only necessary if you want to do something with the futures—wait for them to finish, check their return values, etc.

    Meanwhile, you're explicitly creating the pool with max_workers=1. Not surprisingly, this means you'll only get 1 worker child process, so it'll end up waiting for one subprocess to finish before grabbing the next one. If you want to actually run them concurrently, remove that max_workers and let it default to one per core (or pass max_workers=8 or some other number that's not 1, if you have a good reason to override the default).


    While we're at it, there are a lot of ways to simplify what you're doing:

    • Do you really need multiprocessing here? If you need to communicate with each subprocess, that can be painful to do in a single thread—but threads, or maybe asyncio, will work just as well as processes here.
    • More to the point, it doesn't look like you actually do need anything but launch the process and wait for it to finish, and that can be done in simple, synchronous code.
    • Why are you building a string and using shell=1 instead of just passing a list and not using the shell? Using the shell unnecessarily creates overhead, safety problems, and debugging annoyances.
    • You really don't need the jid on each future—it's just the list of all of your invocation strings, which can't be useful. What might be more useful is some kind of identifier, or the subprocess return code, or… probably lots of other things, but they're all things that could be done by reading the return value of subprocess.call or a simple wrapper.
    • You really don't need the callback either. If you just gather all the futures in a list and as_completed it, you can print the results as they show up more simply.
    • If you do both of the above, you've got nothing left but a pool.submit inside the loop—which means you can replace the entire loop with pool.map.
    • You rarely need, or want, to mix os.walk and glob. When you actually have a glob pattern, apply fnmatch over the files list from os.walk. But here, you're just looking for a specific filename in each dir, so really, all you need to filter on is file == 'Read.py'.
    • You're not using the i in your loop. But if you do need it, it's better to do for i, file in enumerate(FileList): than to do for file in FileList: and manually increment an i.