Search code examples
pythonautomationluigi

Python Luigi Task structure


class Download(Task):

    date_interval = DateIntervalParameter()

    def output(self):
            return LocalTarget("data/user_{0}.tar.bz2".format(self.date_interval))

    def run(self):
            #import pdb; pdb.set_trace()
            SENTENCE_URL = 'http://downloads.org/exports/user_lists.tar.bz2'
            sentence_file = download(SENTENCE_URL, out=self.output().path)

class Uncompress(Task):

    date_interval = DateIntervalParameter()

    def output(self):
            return LocalTarget("data/user_{0}.tar".format(self.date_interval))

    def requires(self):
            return Download(self.date_interval)

    def run(self):
            with open(self.output().path, 'wb') as tar_file, open(self.input().path, 'rb') as file:
                    decompressor = BZ2Decompressor()
                    #loop over each tar file in the bzip file
                    for data in iter(lambda : file.read(100 * 1024), b''):
                            tar_file.write(decompressor.decompress(data))

My first Task downloads a file from the internet, the next task is to uncompress it. The next task I am about to write will read from a CSV file that is in the tar file and parse it into multiple files. i.e. data/file_{var}, data/faile_{var2}.. etc. But I believe Task 3 will need to have a date interval to pass to the other Tasks.

Is there a way around this or a better way to structure my Tasks?


Solution

  • There are a few things you can do. From the docs: http://luigi.readthedocs.io/en/stable/parameters.html

    Parameters are resolved in the following order of decreasing priority:
    
    1. Any value passed to the constructor, or task level value set on the command line (applies on an instance level) 
    2. Any value set on the command line (applies on a class level) 
    3. Any configuration option (applies on a class level) 
    4. Any default value provided to the parameter (applies on a class level)
    

    At the command line, you can do things like:

    luigi Uncompress --Download-dateinverval 2017-02-03
    

    to pass parameters to other tasks in the hierarchy.