Search code examples
pythonmultiprocessingpython-multiprocessing

Job queue with dependencies with python multiprocessing


I have a function and a list of jobs:

jobs = [[(2, 'dog'), None],
        [(-1, 'cat'), (0,)], 
        [(-1, 'Bob'), (1,)],
        [(7, 'Alice'), None],
        [(0, 'spam'), (2,3)]]

I would like to apply the function to the arguments (first tuple) in parallel, while satisfying the dependencies on previous jobs (second tuple). Eg the cat job cannot be started until the dog job has finished. However, I don't want to occupy a core and wait until the job's dependencies are finished. Instead, I want to move on to a different job which can be executed immediately, so that all cores are kept busy at all times, if possible. Any tips? Many thanks!


Solution

  • The comment posted by Charchit Agarwal is potentially one way. The problem is if a job has multiple dependencies and these dependencies are completed in different processes, how do these "super" functions communicate among one another? So, here is another method that uses a job-completion callback to submit new jobs as job dependencies complete:

    I would first process your jobs list to create the following instances:

    1. starts_immediately: A list of job numbers (i.e. indices of jobs) that can be submitted immediately since they have no dependencies.
    2. depends_on: A dictionary of sets. The key is a job number and its value is a set of jobs that must complete before this job can be submitted.
    3. precedes: A dictionary of sets. The key is a job number and its value is a set of job numbers that cannot be started until this job completes.

    We then arrange that for whenever a job completes, we determine what jobs, if any, can be submitted. For this we use a job-completion callback function:

    DEBUG = True
    
    def worker(tpl):
        import time
    
        print('Starting work on:', tpl, flush=True)
        time.sleep(.5) # Simulate work being done
        ...
        print('Completed work on:', tpl, flush=True)
    
    def main(jobs):
        from multiprocessing import Pool
        from collections import defaultdict
        from functools import partial
        from threading import Event
    
        starts_immediately = []
        depends_on = {}
        precedes = defaultdict(set)
        for job_number, job in enumerate(jobs):
            _, dependency = job
            if dependency is None:
                starts_immediately.append(job_number)
            else:
                depends_on[job_number] = set(dependency)
                for job_number_2 in dependency:
                    precedes[job_number_2].add(job_number)
    
        if DEBUG:
            print('starts _immediately:', starts_immediately)
            print('depends on:', depends_on)
            print('precedes:', precedes)
            print()
    
        jobs_completed = Event()
    
        jobs_to_complete = len(jobs)
    
        with Pool() as pool:
            def my_callback(job_number, result):
                nonlocal jobs_to_complete
    
                jobs_to_complete -= 1
                if jobs_to_complete == 0: # Ww have completed all jobs:
                    jobs_completed.set()
                    return
    
                for job_number_2 in precedes[job_number]:
                    s = depends_on[job_number_2]
                    s.remove(job_number) # This dependency completed
                    if not s: # No more dependencies to wait for:
                        pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))
    
            # The jobs we can initially submit to get things rolling:
            for job_number in starts_immediately:
                pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
            jobs_completed.wait() # Wait for all jobs to complete
    
    if __name__ == '__main__':
        jobs = [[(2, 'dog'), None],
                [(-1, 'cat'), (0,)],
                [(-1, 'Bob'), (1,)],
                [(7, 'Alice'), None],
                [(0, 'spam'), (2,3)]]
        main(jobs)
    

    Prints:

    starts _immediately: [0, 3]
    depends on: {1: {0}, 2: {1}, 4: {2, 3}}
    precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
    
    Starting work on: (2, 'dog')
    Starting work on: (7, 'Alice')
    Completed work on: (2, 'dog')
    Completed work on: (7, 'Alice')
    Starting work on: (-1, 'cat')
    Completed work on: (-1, 'cat')
    Starting work on: (-1, 'Bob')
    Completed work on: (-1, 'Bob')
    Starting work on: (0, 'spam')
    Completed work on: (0, 'spam')