I have a simple luigi Task that on run will yield itself with a different parameter as shown below.
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
#run defination
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
if self.id_parameter < 10:
next_val = self.id_parameter + 1
yield ComputeJob(id_parameter = next_val)
I am expecting it to run for 10 times and then exit the run but after executing the 10th iteration, it starts re-executing the 9 steps from the very start. Due to this the tasks keep looping in step 9 and 10.
so the expected out put should be :
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
but the output that i get is :
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...
What am I missing here?
Thanks, Oyshik
I would suggest you look at the documentation for the method complete if you want to 'do away with the output all together' as you stated in your follow-up comment.
Another option would be to create an additional wrapper task to run this ComputeJob task you've created as many times as you want.
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
done = False
#run definition
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
self.done = True
def complete(self):
if self.done:
return True
else:
return False
class RunComputeJobs(luigi.WrapperTask):
def requires(self):
for i in range(1,10):
yield ComputeJob(id_parameter = i)