Search code examples
python-3.xluigi

Luigi Tasks going into infinite Loops


I have a simple luigi Task that on run will yield itself with a different parameter as shown below.

import luigi

class ComputeJob(luigi.Task):

   id_parameter = luigi.parameter.IntParameter()

    #run defination
    def run(self):


        print ("\nrunning task {}".format(self.id_parameter))
        #Do some work here

        if self.id_parameter < 10: 
            next_val = self.id_parameter + 1
            yield ComputeJob(id_parameter = next_val)

I am expecting it to run for 10 times and then exit the run but after executing the 10th iteration, it starts re-executing the 9 steps from the very start. Due to this the tasks keep looping in step 9 and 10.

so the expected out put should be :

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10

but the output that i get is :

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...

What am I missing here?

Thanks, Oyshik


Solution

  • I would suggest you look at the documentation for the method complete if you want to 'do away with the output all together' as you stated in your follow-up comment.

    Another option would be to create an additional wrapper task to run this ComputeJob task you've created as many times as you want.

    import luigi
    
    class ComputeJob(luigi.Task):
      id_parameter = luigi.parameter.IntParameter()
      done = False
    
      #run definition
      def run(self):
        print ("\nrunning task {}".format(self.id_parameter))
        #Do some work here
        self.done = True
    
      def complete(self):
        if self.done:
          return True
        else:
          return False
    
    class RunComputeJobs(luigi.WrapperTask):
      def requires(self):
        for i in range(1,10):
          yield ComputeJob(id_parameter = i)