Search code examples
pythonmultithreadingpython-2.7directed-acyclic-graphs

parallel processing of DAG


I'm trying hard to figure out how I can process a directed acyclic graph in parallel. Each node should only be able to "execute" when all its input nodes have been processed beforehand. Imagine a class Task with the following interface:

class Task(object):
    result = None
    def inputs(self):
        ''' List all requirements of the task. '''
        return ()
    def run(self):
        pass

I can not think of a way to process the graph that could be represented by this structure asynchronously with a maximum number of workers at the same time, except for one method.

I think the optimal processing would be achieved by creating a thread for each task, waiting for all inputs to be processed. But, spawning a thread for each task immediately instead of consecutively (i.e. when the task is ready to be processed) does not sound like a good idea to me.

import threading
class Runner(threading.Thread):
    def __init__(self, task):
        super(Runner, self).__init__()
        self.task = task
        self.start()
    def run(self):
        threads = [Runner(r) for r in self.task.inputs()]
        [t.join() for t in threads]
        self.task.run()

Is there a way to mimic this behaviour more ideally? Also, this approach does currently not implement a way to limit the number of running tasks at a time.


Solution

  • Stumbling back to this question years later, I would recommend any soul crossing this road to look at the topological sort algorithm. At every step in the algorithm, you'll be looking at all the nodes in the graph that have an in-degree of zero (0). All such nodes may be processed in parallel. Limiting the number of nodes being processed in parallel this way becomes quite trivial, as you can simply decide not to push all of those nodes into a worker queue. Remove the fully processed nodes from the graph, which should leave new nodes with an in-degree zero (0), unless there's a cycle in the graph.