I have a python function that loads the data into a sql server table from 2 other tables.
def load_table(date1,date2):
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
cnx1= connection string
self.curs=cnx1.cursor()
self.curs.execute(stmt)
self.curs.commit()
I am trying to convert this function into a luigi task
Tried the below approach by following the docs:
class Datetask(luigi.Task):
def output(self):
return luigi.DateParameter()
class loading(luigi.Task):
def requires(self):
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
def run(self):
date1 = dict['date1']
date2 = dict['date2']
strDate1 = date1.strftime('%m/%d/%Y')
strDate2 = date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
When I try and run this , I get errors:
python -m luigi --module load_task loading --local-scheduler
DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
deps = task.deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
return flatten(self._requires())
File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
return flatten(self.requires()) # base impl
File "load_task.py", line 19, in requires
return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))}
NameError: global name 'DateTask' is not defined
I am defining DateTask so the error is confusing me.
Also, do all the tasks need to have all 3 requires(),run,output?
Also, is it necessary to always write the output to a file? Brand new when it comes to using luIgi so would appreciate any inputs
I think something like this would work better:
class LoadTask(luigi.Task):
date1 = luigi.DateParameter()
date2 = luigi.DateParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget("{0}-{1}.txt".format(self.date1, self.date2))
def run(self):
strDate1 = self.date1.strftime('%m/%d/%Y')
strDate2 = self.date2.strftime('%m/%d/%Y')
stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
curs=cnx1.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
with self.output().open('w') as out_file:
print >> out_file, strDate1, strDate2
Invoke with:
luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler
Also, do all the tasks need to have all 3 requires(),run,output?
Yes. Although there are task types, such as luigi.WrapperTask
that do not require output()
and you can return None
from requires()
if you are the first task in the chain, etc.
Also, is it necessary to always write the output to a file?
No. For example the SQL Alchemy contrib module defines a Target subclass you can use as a target in the database. http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html