I'm trying to create Luigi tasks dynamically (based on attributes in cmdList) and make the preceding task the dependency of the next task. SQLTask is a subclass of Luigi.Task. However, when I run this code, I get This progress looks :| because there were tasks that were not granted run permission by the scheduler
What am I missing?
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def run(self):
print(subprocess.call(self.queryCmd, shell=True))
self.get_target().touch()
def dep_s_dep(cmdList, dep1):
"""
This returns dependency task's dependency
"""
dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
return dep2[0]
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
depend_task = ""
def run(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
runDesc = self.runDesc
bdx_sql = r'r:\\1.SQL\\BDX_SQL\\'
cmdList = [
('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
]
tasks = []
for queryKey, queryCmd, dependQry in cmdList:
class_name = queryKey
klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})
if dependQry != '':
dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
print(f"{queryKey}'s dep1", dep1)
depend_task = [globals()[dependQry](acctDate=self.acctDate,
ssisDate=self.ssisDate,
queryKey=dep1[0],
queryCmd=dep1[1],
runDesc=self.runDesc,
dependQry=dep1[2])]
def requires1(cls):
return depend_task
setattr(klass, "requires", classmethod(requires1))
globals()[queryKey] = klass # make the class available at the module level
tasks.append(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
yield tasks
self.get_target().touch()
===========Stack trace
C:\ProgramData\Anaconda3\python.exe R:/1.PY/DataPipeLine/run_BDX_process.py BDX_Query_Main --local-scheduler
DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
INFO: Informed scheduler that task BDX_Query_Main_201904_201905_444c47aebc has status PENDING
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1) is complete
BDX020's dep1 ('BDX010', '"r:\\1.SQL\\BDX_SQL\\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ', '')
BDX022a's dep1 ('BDX020', '"r:\\1.SQL\\BDX_SQL\\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX022b's dep1 ('BDX022a', '"r:\\1.SQL\\BDX_SQL\\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX022c's dep1 ('BDX022b', '"r:\\1.SQL\\BDX_SQL\\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04', 'BDX022a')
BDX023's dep1 ('BDX020', '"r:\\1.SQL\\BDX_SQL\\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX024's dep1 ('BDX020', '"r:\\1.SQL\\BDX_SQL\\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025a's dep1 ('BDX020', '"r:\\1.SQL\\BDX_SQL\\BDX_001_NI_DM 020.sql" ', 'BDX010')
BDX025b's dep1 ('BDX025a', '"r:\\1.SQL\\BDX_SQL\\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ', 'BDX020')
BDX025c's dep1 ('BDX025b', '"r:\\1.SQL\\BDX_SQL\\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04', 'BDX025a')
INFO: Informed scheduler that task BDX_Query_9XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) running BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
INFO: [pid 6820] Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) new requirements BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=) is complete
INFO: Informed scheduler that task BDX010_201904___r__1_SQL_BDX_SQ_c7c8473ba5 has status DONE
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX020_201904_BDX010__r__1_SQL_BDX_SQ_573b857d50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_7a4a9cc485 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04, runDesc=201904 Luigi test1, dependQry=BDX022a) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_313dc66c50 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_d198713a82 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:\1.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX023_201904_BDX020__r__1_SQL_BDX_SQ_236e57639e has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX024(acctDate=201904, ssisDate=201905, queryKey=BDX024, queryCmd="r:\1.SQL\BDX_SQL\BDX_024_P031_ITD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX024_201904_BDX020__r__1_SQL_BDX_SQ_1a8ad5a673 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025a(acctDate=201904, ssisDate=201905, queryKey=BDX025a, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025a_201904_BDX020__r__1_SQL_BDX_SQ_91bd598abf has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
DEBUG: Checking if BDX025c(acctDate=201904, ssisDate=201905, queryKey=BDX025c, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 030.sql" -v YYMM=201904, runDesc=201904 Luigi test1, dependQry=BDX025b) is complete
DEBUG: Checking if BDX025b(acctDate=201904, ssisDate=201905, queryKey=BDX025b, queryCmd="r:\1.SQL\BDX_SQL\BDX_025_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX025a) is complete
INFO: Informed scheduler that task BDX025c_201904_BDX025b__r__1_SQL_BDX_SQ_c98f5f14c3 has status PENDING
INFO: Informed scheduler that task BDX025b_201904_BDX025a__r__1_SQL_BDX_SQ_eef5a35ad5 has status PENDING
INFO: Informed scheduler that task BDX_Query_0XX_201904_201904_Luigi_tes_201905_db874019d2 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 12 pending tasks possibly being run by other workers
DEBUG: There are 12 pending tasks unique to this worker
DEBUG: There are 12 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=931855678, workers=1, host=LWVPWEACT001, username=i805649, pid=6820) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 14 tasks of which:
* 1 complete ones were encountered:
- 1 BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=)
* 13 were left pending, among these:
* 1 were missing external dependencies:
- 1 BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
* 2 had missing dependencies:
- 1 BDX_Query_9XX(acctDate=201904, ssisDate=201905, runDesc=201904 Luigi test1)
- 1 BDX_Query_Main(acctDate=201904, ssisDate=201905)
* 10 was not granted run permission by the scheduler:
- 1 BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=201904 Luigi test1, dependQry=BDX010)
- 1 BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=201904 Luigi test1, dependQry=BDX020)
- 1 BDX022b(...)
- 1 BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=201904 Luigi test1, dependQry=BDX022b)
- 1 BDX023(acctDate=201904, ssisDate=201905, queryKey=BDX023, queryCmd="r:\1.SQL\BDX_SQL\BDX_023_P031_MTD_All_Final_CatAdj.sql" , runDesc=201904 Luigi test1, dependQry=BDX020)
...
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
Process finished with exit code 0
I think you are making this more complicated than it needs to be. Firstly, dynamic dependencies are useful when the the full list of tasks isn't know until runtime. For example you might have a situation where you have to run a task to query a database, and for each row returned by your query you require a new dependency.
This is distinctly different use case from creating a set of tasks and their dependencies programmatically, which is what it feels like you are doing in your example.
The following toy code shows how you could achieve what you are trying to do:
import luigi
import datetime
import logging
logger = logging.getLogger('luigi-interface')
task_list = {
'taskA': ['taskA_command', ''],
'taskB': ['taskB command', 'taskA'],
'taskC': ['taskC command', 'taskA'],
'taskD': ['taskD command', 'taskB'],
}
// Equivalent of your BDX_Task class
class MyTask(luigi.Task):
task_date = luigi.DateParameter()
task_name = luigi.Parameter()
task_command = luigi.Parameter()
dependent_task_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)
logger.debug('MyTask.__init__ called for task_name="{}"'.format(self.task_name))
def output(self):
filename = 'output_files/{date:%Y%m%d}/{name}.output'.format(date=self.task_date,name=self.task_name)
return luigi.LocalTarget(filename)
def requires(self):
if self.dependent_task_name != '' and self.dependent_task_name in task_list:
dependent_task_command, next_dependent_task_name = task_list[self.dependent_task_name]
return [self.__class__(
task_date=self.task_date,
task_name=self.dependent_task_name,
task_command=dependent_task_command,
dependent_task_name=next_dependent_task_name,
)]
else:
return []
def run(self):
with self.output().open('w') as handle:
handle.write('Command to run: "{cmd}"'.format(cmd=self.task_command))
// Equivalent of your BDX_Query_0XX class
class myWrapperTask(luigi.WrapperTask):
task_date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
for task_name, (task_command, dep_name) in task_list.items():
yield MyTask(
task_date=self.task_date,
task_name=task_name,
task_command=task_command,
dependent_task_name=dep_name,
)
Output generated
$ PYTHON_PATH=.:$PYTHON_PATH && python -m luigi --module dynamic_dependencies myWrapperTask --local-scheduler --log-level INFO
MyTask.__init__ called for task_name="taskA"
MyTask.__init__ called for task_name="taskB"
MyTask.__init__ called for task_name="taskC"
MyTask.__init__ called for task_name="taskD"
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status PENDING
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status PENDING
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status PENDING
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=)
INFO: Informed scheduler that task MyTask__taskA_command_2019_04_12_bdb7812ab5 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskB, task_command=taskB command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskB_command_2019_04_12_d66dc54b89 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskD, task_command=taskD command, dependent_task_name=taskB)
INFO: Informed scheduler that task MyTask_taskB_taskD_command_2019_04_12_6afc23b3fe has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done MyTask(task_date=2019-04-12, task_name=taskC, task_command=taskC command, dependent_task_name=taskA)
INFO: Informed scheduler that task MyTask_taskA_taskC_command_2019_04_12_35a27fe401 has status DONE
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) running myWrapperTask(task_date=2019-04-12)
INFO: [pid 35037] Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) done myWrapperTask(task_date=2019-04-12)
INFO: Informed scheduler that task myWrapperTask_2019_04_12_c2195ac5bd has status DONE
INFO: Worker Worker(salt=526551035, workers=1, host=ChrisPalmersMBP.localdomain, username=cpalmer, pid=35037) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 4 MyTask(task_date=2019-04-12, task_name=taskA, task_command=taskA_command, dependent_task_name=) ...
- 1 myWrapperTask(task_date=2019-04-12)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
The major structural difference from your code is that that my task_list
is defined outside the task classes. It's possible that you simplified things for SO, and that your cmdList
is really going to be the output from another task and can't be defined outside the classes. You could solve that by just adding the list to globals()
once generated or you could pass the full list of commands to MyTask
/BDX_Task
as a parameter so it can be referenced in MyTask.requires()
(maybe not the best idea if the list is potentially large). Also, as you originally did, you couldn't use a luigi.WrapperTask
like my example.