Search code examples
dependenciesyieldluigi

luigi: task runs other tasks without creating dependency?


In luigi, I understand that if a task yields to another task, the second task becomes a new dependency for the original task, and this causes the original task to be re-run after the yielded task completes.

However, in certain cases, I would like a task to defer to another task, without the deferred-to task becoming a dependency. The reason I want this is because I don't want my current task's run method to be re-run after the other task completes.

Yes, I know that my run method should be idempotent. Nonetheless, there are cases where I absolutely do not want that method to be run a second time after yielding to the other task.

I figured out a way to do this, but I'm not sure if it's the best solution, and I'd like some suggestions, if any of you have any.

Suppose that there are two tasks: MainTask and OtherTask. MainTask is invoked via the command line using various parameters. Depending on the settings of these parameters, MainTask might invoke OtherTask. If so, I do not want the run method of MainTask to be invoked a second time.

class OtherTask(luigi.Task):
    # Under some circumstances, this task can be invoked
    # from the command line, and it can also be invoked
    # in the normal luigi manner as a dependency for one
    # or more other tasks.
    # It also might be yielded to, as is done in the
    # "run" method for `MainTask`, below.

    id = luigi.parameter.IntParameter()

    def complete(self):
        # ...
        # return True or False depending on various tests.

    def requires(self):
        # return [ ... various dependencies ... ]

    def run(self):
        # do stuff with self.id
        # ...
        with self.output().open('w') as f:
            f.write('OK')

    def output(self):
        return '... something ...'

class MainTask(luigi.Task):
    # Parameters are expected to be supplied on the command line.
    param1 = luigi.parameter.IntParameter()
    param2 = luigi.parameter.BoolParameter()
    # ... etc. ...

    def run(self):
        #
        # Here's how I keep this "run" method from being
        # invoked more than once. Is there a better way
        # to invoke `OtherTask` without having it cause 
        # this current task to be re-invoked?
        if self.complete():
            return

        # Normal "run" processing for this task ...
        # ... etc. ...

        # Possibly run `OtherTask` multiple times, only if
        # certain conditions are met ... 
        tasks = []
        if the_conditions_are_met:
            ids = []
            # Append multiple integer ID's to the `ids` list.
            # Calculate each ID depending upon the values
            # passed in via self.param1, self.param2, etc.
            # Do some processing depending on these ID's.
            # ... etc. ...

            # Then, create a list of tasks to be invoked,
            # each one taking one of these ID's as a parameter.
            for the_id in ids:
                tasks.append(OtherTask(id=the_id))

        with self.output().open('w') as f:
            f.write('OK')

        # Optionally yield after marking this task as 
        # complete, so that when the yielded tasks have
        # all run, this task's "run" method can test for
        # completion and not re-run its logic.
        if tasks:
            yield tasks

    def output(self):
        return '... whatever ...'        

Solution

  • Per my comment, using an auxiliary class seems to work. It will only run once, and even if the main class's run method is invoked more than once, it just reuses the output data from the auxiliary class without it being recalculated.

    class OtherTask(luigi.Task):
        # Under some circumstances, this task can be invoked
        # from the command line, and it can also be invoked
        # in the normal luigi manner as a dependency for one
        # or more other tasks.
        # It also might be yielded to, as is done in the
        # "run" method for `MainTask`, below.
    
        id = luigi.parameter.IntParameter()
    
        def complete(self):
            # ...
            # return True or False depending on various tests.
    
        def requires(self):
            # return [ ... various dependencies ... ]
    
        def run(self):
            # do stuff with self.id
            # ...
            with self.output().open('w') as f:
                f.write('OK')
    
        def output(self):
            return '... something ...'
    
    class AuxiliaryTask(luigi.Task):
    
        def requires(self):
            # return [ ... various dependencies ... ]
    
        def run(self):                
            ids = []
            # Append multiple integer ID's to the `ids` list.
            # Calculate each ID depending upon the values
            # passed to this task via its parameters. Then ...
            with self.output().open('w') as f:
                f.write(json.dumps(ids))
    
        def output(self):
            return '... something else ...' 
    
    class MainTask(luigi.Task):
        # Parameters are expected to be supplied on the command line.
        param1 = luigi.parameter.IntParameter()
        param2 = luigi.parameter.BoolParameter()
        # ... etc. ...
    
        def requires(self):
            return [ self.clone(AuxiliaryTask) ]
    
        def run(self):
            # This method could get re-run after the yields,
            # below. However, it just re-reads its input, instead
            # of that input being recalculated. And in the second
            # invocation, luigi's dependency mechanism will prevent
            # any re-yielded-to tasks from repeating what they did
            # before.
            ids = []
            with self.input().open('r') as f:
                ids = json.dumps(f.read())
    
            if ids:
                tasks = []
    
                # Create a list of tasks to be invoked,
                # each one taking one of these ID's as a parameter.
                # Then, yield to each of these tasks.
                for the_id in ids:
                    tasks.append(OtherTask(id=the_id))
                if tasks:
                    yield tasks
    
            with self.output().open('w') as f:
                f.write('OK')
    
    
        def output(self):
            return '... whatever ...'