I am using Luigi to run several tasks, and then I need to bulk transfer the output to a standardized file location. I've written a WrapperTask with an overridden complete()
method to do this:
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
but I'm having trouble getting conditional part of complete()
to be called when the process is actually finished.
I assume this is because of asynchronous behavior pointed out by others, but I'm not sure how to fix it.
I've tried running Luigi with these command-line parameters:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
But that doesn't seem to be working correctly. Is this the right approach to handle this type of task?
Also, I'm curious — has anyone had experience with the --worker-retry-external-task
command? I'm having some trouble understanding it.
In the source code,
def _is_external(task):
return task.run is None or task.run == NotImplemented
is called to determine whether or not a LuigiTask has a run()
method, which a WrapperTask
does not. Thus, I'd expect the --retry-external-task
flag to retry complete()
for this until it's complete, thus performing the action. However, just playing around in the interpreter leads me to believe that:
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
This code snippet is not doing what it thinks it is.
Am I off-base here?
I still think that overriding .complete()
should in theory have been able to do this, and I'm still not sure why it's not, but if you're just looking for a way to bulk-transfer files after running a process, a workable solution is just to have the transfer take place within a .run()
method:
def run(self):
logger.info('transferring into current directory')
self.client.copy('current-old','current')