Search code examples
pythonluigi

Replacing a table load function with a luigi task


I have a python function that loads the data into a sql server table from 2 other tables.

def load_table(date1,date2):
    strDate1 = date1.strftime('%m/%d/%Y')
    strDate2 = date2.strftime('%m/%d/%Y')
    stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
    cnx1= connection string
    self.curs=cnx1.cursor()
    self.curs.execute(stmt)
    self.curs.commit()

I am trying to convert this function into a luigi task

Tried the below approach by following the docs:

class Datetask(luigi.Task):
    def output(self):
        return luigi.DateParameter()

class loading(luigi.Task):
    def requires(self):
        return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}

    def run(self):
       date1 = dict['date1']

    date2 = dict['date2']
    strDate1 = date1.strftime('%m/%d/%Y')
    strDate2 = date2.strftime('%m/%d/%Y')
    stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
    curs=cnx1.cursor()
    curs.execute(stmt)
    curs.commit()
    curs.close()

When I try and run this , I get errors:

    python -m luigi --module load_task loading  --local-scheduler
    DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
  is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
    deps = task.deps()
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
    return flatten(self._requires())
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
    return flatten(self.requires())  # base impl
  File "load_task.py", line 19, in requires
    return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
NameError: global name 'DateTask' is not defined

I am defining DateTask so the error is confusing me.

Also, do all the tasks need to have all 3 requires(),run,output?

Also, is it necessary to always write the output to a file? Brand new when it comes to using luIgi so would appreciate any inputs


Solution

  • I think something like this would work better:

    class LoadTask(luigi.Task):
        date1 = luigi.DateParameter()
        date2 = luigi.DateParameter()        
    
        def requires(self):
            return None
    
        def output(self):
            return luigi.LocalTarget("{0}-{1}.txt".format(self.date1, self.date2))
    
        def run(self):
            strDate1 = self.date1.strftime('%m/%d/%Y')
            strDate2 = self.date2.strftime('%m/%d/%Y')
            stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
            curs=cnx1.cursor()
            curs.execute(stmt)
            curs.commit()
            curs.close()
            with self.output().open('w') as out_file:
                print >> out_file, strDate1, strDate2
    

    Invoke with:

    luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler
    

    Also, do all the tasks need to have all 3 requires(),run,output?

    Yes. Although there are task types, such as luigi.WrapperTask that do not require output() and you can return None from requires() if you are the first task in the chain, etc.

    Also, is it necessary to always write the output to a file?

    No. For example the SQL Alchemy contrib module defines a Target subclass you can use as a target in the database. http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html