Search code examples
pythonetlpipelineluigi

How to fix "luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task'>" when trying to execute luigi tasks?


I am new to Luigi and I have created a pipeline where it gets data from a database, transforms data and then loads it back to the database. I created four tasks in it. However, when I am executing the task on cmd or Pycharm, it says that it cannot schedule a non-task. Below is the pseudocode of my pipeline. The parameters to each task are not inputs rather being taken from other files.

    class Task1(luigi.Task): 
          # Some Parameters
         def get_target(): 
         def query():
         def run(): 
    class Task2(luigi.Task):
          # Some Parameters 
         def requires():
           return Task1()
         def func1():
         def func2():
         def run()
    class Task3(luigi.Task): 
         # Some Parameters 
         def requires():
             return Task2()
         def run():
    class Task4(luigi.Task):
         # Some Parameters 
          def requires(): 
              return Task3()
          def run(): 

On Pycharm, I used

if __name__ == '__main__':
    luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)

and on cmd, I used

 python .\folder\file.py Task1

but it gave me this error

INFO: Worker Worker was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "D:/folder/file.py", line 300, in <module>
    luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 237, in build
    luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 171, in _schedule_and_run
    success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 740, in add
    self._validate_task(task)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 638, in _validate_task
    raise TaskException('Can not schedule non-task %s' % task)
luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task1'>

Solution

  • Firstly, you need to specify outputs for all of your tasks. Without an output, luigi doesn't know when the task is complete. Secondly (which is where your actual problem is) you haven't instantiated the tasks you are passing. You just need to create instances, so try:

    luigi.build([Task1(), Task2(), Task3(), Task4()], workers=5, local_scheduler=True)
    

    However, I feel the need to point out a couple other things:

    1) Since each of your tasks specify the previously needed task, you only need to tell luigi to run the last task in the chain, so:

    luigi.build([Task4()], workers=5, local_scheduler=True)
    

    This will tell luigi that it needs to complete Task 4. To complete Task 4, luigi will look at what Task 4 requires and see Task 3. Then it will see what Task 3 requires to run and see Task 2 and etc. Luigi will automatically build the graph for you and run them in the order that satisfies each task's dependencies.

    Finally, the order in which you give luigi the tasks in .build has little impact overall. This is because luigi doesn't determine order by the order you give it, but by the dependency graph and task priority.

    Edit: If you need to require multiple tasks for another task, you can simply do:

    class Task4(luigi.Task):
      def requires(self):
        return [Task1(), Task2(), Task3()]