Search code examples

Workers dying early due to uneven work distribution in Luigi (2.6.1)

We are trying to run a simple pipeline distributed on a docker swarm cluster. The luigi workers are deployed as replicated docker services. They start successfully and after a few seconds of asking for work to luigi-server, they begin to die due to no work was assigned to them and all tasks end up assigned to a single worker.

We had to set keep_alive=True in luigi.cfg of our workers to force them not to die, but keeping workers around after the pipeline is done seems to be a bad idea. Is there a way to control the work distribution ?

Our test pipeline:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')

class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world

    def run(self):
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        with self.output().open('w') as f:

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))

if __name__ == "__main__":


  • Your issue is the result of yielding a single requirement at a time, instead you want to yield all of them at once, as follows:

    def requires(self):
        reqs = []
        for i in range(self.tasks):
            reqs.append(RunExampleTask(i, self.sleep_time))
        yield reqs