Search code examples
pythonpipelineluigi

Run taskA and run next tasks with parameters, that returned taskA in luigi


I have task, which generates which files should be processed:

class TaskA(luigi.Task):
    def run(self):
        # some code which generates list of files into output()
    def output(self):
        return luigi.LocalTarget(filepath='/path/to/process_these_files.json')

And I have wrapper task, which should run TaskA, get parameters, and run processing task with values, which I put into process_these_files.json

class RunAll(luigi.WrapperTask):
    def requires(self):
        files = json.load(TaskA().open('r'))
        for file in files:
            yield ProcessFileTask(file=file)

Any ideas how to do it?


Solution

  • You could use dynamic dependencies. These are dependencies that are know at runtime. Each time you yield a dynamic dependency the run() method will hold until the dependency is done.

    For example:

    class RunAll(luigi.WrapperTask): 
        def requires(self): 
            return TaskA() 
    
        def run(self):
            files = json.load(self.input().open('r')) 
            for file in files: 
                yield ProcessFileTask(file=file)
    

    Also see https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies