Search code examples
pythonluigi

Automatic instantiate in luigi?


In luigi.Task.run, we need to serialize objects into files/database/etc.:

MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        with self.input().open('r') as infile:
            # instantiate incoming data
            indata = pd.read_csv(infile, index=False, parse_date=...)
        # my process
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)

But for convenience, I want to skip the pd.read_csv(...) snippet because I must write same instantiation steps when tasks are reused.

Is there any automatic way to instantiate in luigi like this?:

AnotherTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        ...
    def output(self):
        ...
    def _instantiate(self):
        with self.output().open('r') as outfile:
            outdata = pd.read_csv(outfile, index=False, parse_date=...)
        return outdata

MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        # automatic instantiation via AnotherTask._instantiate()
        indata = self.input()
        # my process
        outdata = indata.someprocess()
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)

Solution

  • Self answer:

    def getinstances(struct):
        if isinstance(struct, luigi.Task):
            return struct.instantiate()
        elif isinstance(struct, dict):
            return {k: getinstances(v) for k, v in six.iteritems(struct)}
        else:
            # Remaining case: assume r is iterable...
            try:
                s = list(struct)
            except TypeError:
                raise Exception('Cannot map %s to Task/dict/list' % str(struct))
        return [getinstances(r) for r in s]
    
    class MyParentTask(luigi.Task):
        def requires(self):...
        def output(self):...
        def run(self):...
        def instantiate(self):
            with self.output().open() as outfile:
                reader = csv.reader(outfile)
                outdata = [row for row in reader]
            return outdata
    
    class MyChildTask(luigi.Task):
        def requires(self):
            return MyParentTask()
        def output(self):...
        def run(self):
            indata = getinstances(self.requires())
            ...