Search code examples
pythonartificial-intelligencekedromlops

Waiting for nodes to finish in Kedro


I have a pipeline in Kedro that looks like this:

from kedro.pipeline import Pipeline, node
from .nodes import *

def foo():
    return Pipeline([
        node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"),
        node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"),
        node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"),
        node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"),
        
    ])

The nodes A, B, and C are not very resource-intensive, but they take a while so I want to run them in parallel, node D, on the other hand, uses pretty much all my memory, and it will fail if it's executed alongside the other nodes. Is there a way that I can tell Kedro to wait for A, B, and C to finish before executing node D and keep the code organized?


Solution

  • Kedro determines the execution order based on the interdependencies between the inputs/outputs of different nodes. In your case, node D doesn't depend on any of the other nodes, so execution order cannot be guaranteed. Similarly, it cannot be ensured that node D will not run in parallel to A, B and C if using a parallel runner.

    That said, there are a couple of workarounds one could use achieve a particular execution order.

    1 [Preferred] Run the nodes separately

    Instead of doing kedro run --parallel, you could do:

    kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D
    

    This is arguably the preferred solution because it requires no code changes (which is good in case you ever run the same pipeline on a different machine). You could do && instead of ; if you want node D to run only if A, B and C succeded. If the running logic gets more complex, you could store it in a Makefile/bash script.

    2 Using dummy inputs/outputs

    You could also force the execution order by introducing dummy datasets. Something like:

    def foo():
        return Pipeline([
            node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"),
            node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"),
            node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"),
            node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"),     
        ])
    

    Empty lists could do for the dummy datasets. The underlying functions would also have to return/take the additional arguments.

    The advantage of this approach is that kedro run --parallel will immediately result in the desired execution logic. The disadvantage is that it pollutes the definition of nodes and underlying functions.

    If you go down this road, you'll also have to decide whether you want to store the dummy datasets in the data catalog (pollutes even more, but allows to run node D on its own) or not (node D cannot run on its own).


    Related discussions [1, 2]