Search code examples
pythonmultithreadingpython-multithreading

Creating threads in python-multithreading


I have the following threads structure:

               (1)
            /   |   \
          (2)  (3)  (4)
           |    |    |
          (5)  (6)  (7)
           |    |    |
          (8)  (9)  (10)
           |    |    |
          (11) (12) (13)
            \  /     |
            (14)     |
              \     /
                (15)

As you can see, the first function starts three threads, and then each starts a new one. The 14th node is the join of the 11th and 12th; the 15th is the join of the 13th and 14th. I implemented the first two levels (nodes 1, 2, 3, 4) as follows:

self.first()
    list = ['a','b','c']
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(self.do_this, list)
        for result in results:
            print(result)

But have no idea where to go from here.


Solution

  • Here's an example of a runner for a graph like yours.

    The idea is that you define a function that runs each task (do_task here), and build a graph of the (immediate) dependencies each task requires. The example task_deps below mirrors your graph from above.

    The run_graph function will then call do_task with each task ID; the function is supposed to do whatever it needs to compute your result (it can read the results of any previous computation if it needs to).

    The run_graph function will eventually return a dict of {task_id: result}.

    The code below outputs

    Scheduling {1}
    Scheduling {2, 3, 4}
    Scheduling {5, 6, 7}
    Scheduling {8, 9, 10}
    Scheduling {11, 12, 13}
    Scheduling {14}
    Scheduling {15}
    

    which, as supposed, corresponds exactly to the structure of your graph from top to bottom, and

    {1: 'Task 1 completed with result 42',
     2: 'Task 2 completed with result 84',
     3: 'Task 3 completed with result 126',
     4: 'Task 4 completed with result 168',
     5: 'Task 5 completed with result 210',
     6: 'Task 6 completed with result 252',
     7: 'Task 7 completed with result 294',
     8: 'Task 8 completed with result 336',
     9: 'Task 9 completed with result 378',
     10: 'Task 10 completed with result 420',
     11: 'Task 11 completed with result 462',
     12: 'Task 12 completed with result 504',
     13: 'Task 13 completed with result 546',
     14: 'Task 14 completed with result 588',
     15: 'Task 15 completed with result 630'}
    

    import concurrent.futures
    
    
    def do_task(task_id, results, dependencies):
        # sanity check - this function could use `dependencies` and `results` too
        assert all(dep in results for dep in dependencies)
        return f"Task {task_id} completed with result {task_id * 42}"
    
    
    def run_graph(task_dependencies, runner):
        # Dict for results for each task.
        results = {}
        # Set of tasks yet to be completed.
        todo = set(task_dependencies)
    
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # While there are items in the to-do set...
            while todo:
                # ... figure out what we can immediately execute by
                # comparing the dependency set to the result keys we already have
                # (i.e. the complement of the to-do set)
                next_tasks = {
                    task_id
                    for (task_id, deps) in task_dependencies.items()
                    if task_id in todo and set(deps) <= set(results)
                }
                # If there are no next tasks we could schedule, it means the dependency
                # graph is incorrect (or at the very least incompleteable).
                if not next_tasks:
                    raise RuntimeError(
                        f"Unable to schedule tasks, bad dependencies? Todo: {todo}"
                    )
    
                print("Scheduling", next_tasks)
                # Submit tasks for execution in parallel. `futures` will be a list of
                # 2-tuples (task_id, future).
                futures = [
                    (
                        task_id,
                        executor.submit(
                            runner, task_id, results, task_dependencies[task_id]
                        ),
                    )
                    for task_id in next_tasks
                ]
    
                # Loop over the futures, waiting for their results; when a future
                # finishes, save the result value and remove that task from the
                # to-do set.
                for (task_id, future) in futures:
                    results[task_id] = future.result()
                    todo.remove(task_id)
        # Once the while loop finishes, we have our results.
        return results
    
    
    if __name__ == "__main__":
        task_deps = {
            1: (),
            2: (1,),
            3: (1,),
            4: (1,),
            5: (2,),
            6: (3,),
            7: (4,),
            8: (5,),
            9: (6,),
            10: (7,),
            11: (8,),
            12: (9,),
            13: (10,),
            14: (11, 12),
            15: (14, 13),
        }
    
        result = run_graph(task_deps, do_task)
        print(result)