I have task, which generates which files should be processed:
class TaskA(luigi.Task):
def run(self):
# some code which generates list of files into output()
def output(self):
return luigi.LocalTarget(filepath='/path/to/process_these_files.json')
And I have wrapper task, which should run TaskA, get parameters, and run processing task with values, which I put into process_these_files.json
class RunAll(luigi.WrapperTask):
def requires(self):
files = json.load(TaskA().open('r'))
for file in files:
yield ProcessFileTask(file=file)
Any ideas how to do it?
You could use dynamic dependencies. These are dependencies that are know at runtime. Each time you yield
a dynamic dependency the run()
method will hold until the dependency is done.
For example:
class RunAll(luigi.WrapperTask):
def requires(self):
return TaskA()
def run(self):
files = json.load(self.input().open('r'))
for file in files:
yield ProcessFileTask(file=file)
Also see https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies