Search code examples
debuggingworkflowluigi

luigi: re-running tasks for debugging purposes?


I'm writing some luigi workflows, and I am trying to debug the tasks. In order to do so, I need to re-run these tasks with the same parameters over and over, until, finally, they do what I want.

I understand that luigi tasks are idempotent and are therefore not normally re-run when given the same inputs as earlier, and this is exactly what is desired after the workflow is debugged and in production. However, during development, re-running workflows with the exact, same inputs and outputs is useful -- and, I would argue, necessary.

I know I can override the complete() method to return False in each task during development. However, this leaves the tasks in an uncompleted state.

I'm looking for a way to set my workflow to run in some sort of "development" or "debug" mode, so I can run and re-run it over and over again to completion, even when all the tasks run correctly, until I'm sure that the workflow is doing exactly what I want.

Is there any way to do this in luigi?

Thank you in advance.

================ added later ================

Per my comment below, it appears that changing the input parameters to a task will not cause it to be re-run. Only if its output() method returns a unique value will that task be re-runnable. This seems to go against the definition of "idempotent", since changing the input parameters should treat a truly idempotent task as a new, unique entity, irrespective of whether it happens to return the same output as another invocation with different input parameters.

The following code illustrates the problem. The "x" parameter determines the file name that the output() method returns, while the "y" parameter is used within the contents of the output, but not for the name of the output file.

If I call my workflow with "--x 10 --y 20" and then "--x 10 --y 30", the second invocation does not cause either of the tasks to be re-run. This, I believe, is incorrect behavior. However, if I call the workflow with "--x 10 --y 20" followed by "--x 11 --y 20", both tasks will indeed be re-run.

#!/usr/bin/python3                                                                                                              
# -*- python -*-                                                                                                                

import luigi

class Child(luigi.Task):

    x = luigi.Parameter()
    y = luigi.Parameter()

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("child_{}.txt".format(self.x))

    def run(self):
        with self.output().open('w') as f:
            f.write('{} {}\n'.format(self.x, self.y))

class Parent(luigi.Task):

    x = luigi.Parameter()
    y = luigi.Parameter()

    def requires(self):
        return [ Child(self.x, self.y) ]

    def output(self):
        return luigi.LocalTarget("parent_{}.txt".format(self.x))

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                fout.write("from command line: --x {} --y {}, from child: {}\n".format(self.x, self.y, line.strip()))

if __name__ == '__main__':
    luigi.run()

Solution

  • As you said, a debug mode will be great. But I think Luigi does not have anything like that.

    A trick you can do is to remove targets before the task calls the complete() method as shown here. Your task must be a subclass of this class so you may use --forceparameter to reset it before executing it.

    Notice that this solution only works if your tasks have local files as output. You must customize it in order to delete S3 keys/buckets, database tables or rows, and so on.