Search code examples
pythonluigi

Retry .complete() for WrapperTask


I am using Luigi to run several tasks, and then I need to bulk transfer the output to a standardized file location. I've written a WrapperTask with an overridden complete() method to do this:

from luigi.task import flatten

class TaskX(luigi.WrapperTask):
    date = luigi.DateParameter()
    client = luigi.s3.S3Client()

    def requires(self):
        yield TaskA(date=self.date)     
        yield TaskB(date=self.date)

    def complete(self):
        tasks_complete = all(r.complete() for r in flatten(self.requires())) 

        ## at the end of everything, batch copy the files
        if tasks_complete:
            self.client.copy('current-old', 'current')
            return True
        else:
            return False


if __name__ == "__main__":
    luigi.run()

but I'm having trouble getting conditional part of complete() to be called when the process is actually finished.

I assume this is because of asynchronous behavior pointed out by others, but I'm not sure how to fix it.

I've tried running Luigi with these command-line parameters:

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task

But that doesn't seem to be working correctly. Is this the right approach to handle this type of task?

Also, I'm curious — has anyone had experience with the --worker-retry-external-task command? I'm having some trouble understanding it.

In the source code,

def _is_external(task):
    return task.run is None or task.run == NotImplemented

is called to determine whether or not a LuigiTask has a run() method, which a WrapperTask does not. Thus, I'd expect the --retry-external-task flag to retry complete() for this until it's complete, thus performing the action. However, just playing around in the interpreter leads me to believe that:

>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True

This code snippet is not doing what it thinks it is.

Am I off-base here?


Solution

  • I still think that overriding .complete() should in theory have been able to do this, and I'm still not sure why it's not, but if you're just looking for a way to bulk-transfer files after running a process, a workable solution is just to have the transfer take place within a .run() method:

    def run(self):
        logger.info('transferring into current directory')
        self.client.copy('current-old','current')