Search code examples
pythonpython-3.6luigi

python luigi : requires() can not return Target objects


I'm really new to Luigi and I would like to set up luigi to execute my API calls.

I'm working with MockFiles since the json object that I retrieve through API are light and I want to avoid to use an external database.

This is my code :

import luigi 
from luigi import Task, run as runLuigi, mock as LuigiMock
import yaml

class getAllCountries(Task):
    task_complete = False

    def requires(self):
        return LuigiMock.MockFile("allCountries")

    def run(self):
        sync = Sync()
        # Get list of all countries
        countries = sync.getAllCountries()
        if(countries is None or len(countries) == 0):
            Logger.error("Sync terminated. The country array is null")

        object_to_send = yaml.dump(countries)

        _out = self.output().open('r')
        _out.write(object_to_send)
        _out.close()

        task_complete = True


    def complete(self):
        return self.task_complete


class getActiveCountries(Task):

    task_complete = False 

    def requires(self):
        return getAllCountries()

    def run(self):
        _in = self.input().read('r')
        serialised = _in.read()
        countries = yaml.load(serialised)

        doSync = DoSync()
        activeCountries = doSync.getActiveCountries(countries)  
        if(activeCountries is None or len(activeCountries) == 0):
            Logger.error("Sync terminated. The active country account array is null")        

        task_complete = True 

    def complete(self):
        return self.task_complete

if __name__ == "__main__":
    runLuigi()

I'm running the project with the following command :

 PYTHONPATH='.' luigi --module app getActiveCountries --workers 2 --local-scheduler

And it fails, and this is the stacktrace that I got :

DEBUG: Checking if getActiveCountries() is complete
DEBUG: Checking if getAllCountries() is complete
INFO: Informed scheduler that task   getActiveCountries__99914b932b   has status   PENDING
ERROR: Luigi unexpected framework error while scheduling getActiveCountries()
Traceback (most recent call last):
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
    for next in self._add(item, is_complete):
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
    self._validate_dependency(d)
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
    raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=797067816, workers=2, host=xxx, pid=85795) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
    worker = luigi.interface._run(argv).worker
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 211, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/interface.py", line 171, in _schedule_and_run
    success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 763, in add
    for next in self._add(item, is_complete):
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 861, in _add
    self._validate_dependency(d)
    File "/Users/thibaultlr/anaconda3/envs/testThib/lib/python3.6/site-packages/luigi/worker.py", line 886, in _validate_dependency
    raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class

Also, i'm running the luigid in background and I do not see any tasks that ran on it. Neither if it failed or not

Any ideas ?


Solution

  • Firstly, you are not seeing anything happen within the luigi daemon, because in your PYTHONPATH you specify --local-scheduler. This disregards the daemon entirely and just runs the scheduler on the local process.

    Second, in the getAllCountries task, you are specifying a Target as a requirement, when it should be in your output function. Once you've changed if from:

    def requires(self):
        return LuigiMock.MockFile("allCountries")
    

    to:

    def output(self):
        return LuigiMock.MockFile("allCountries")
    

    you won't need to redefine the complete function or set task_complete to True, because luigi will determine the task is complete by the presence of the output file. To find out more about targets take a look at: https://luigi.readthedocs.io/en/stable/workflows.html#target

    Side note: You can make this section:

    _out = self.output().open('r')
    _out.write(object_to_send)
    _out.close()
    

    a lot easier and less prone to bugs by just using Python's with functionality.

    with self.output().open('r') as _out:
      _out.write(object_to_send)
    

    Python will automatically close the file when exiting the with scope and on error.

    Second side note: Don't use luigi's run. It is deprecated. Use luigi.build instead.