Search code examples
apache-sparkluigi

Luigi doesn't work as expected with Spark & Redshift


I'm running an EMR Spark cluster (uses YARN) and I'm running Luigi tasks directly from the EMR master. I have a chain of jobs that depends on data in S3 and after a few SparkSubmitTasks will eventually end up in Redshift.

import luigi
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.redshift import RedshiftTarget


class SomeSparkTask(SparkSubmitTask):

    # Stored in /etc/luigi/client.cfg
    host = luigi.Parameter(default='host')
    database = luigi.Parameter(default='database')
    user = luigi.Parameter(default='user')
    password = luigi.Parameter(default='password')
    table = luigi.Parameter(default='table')

    <add-more-params-here>

    app = '<app-jar>.jar'
    entry_class = '<path-to-class>'

    def app_options(self):
        return <list-of-options>

    def output(self):
        return RedshiftTarget(host=self.host, database=self.database, user=self.user, password=self.password,
                              table=self.table, update_id=<some-unique-identifier>)

    def requires(self):
        return AnotherSparkSubmitTask(<params>)

I'm running into two main problems:

1) Sometimes luigi isn't able to determine when a SparkSubmitTask is done - for example, I'll see that luigi submits a job, then check YARN, which will say that the application is running, but once it's done, luigi just hangs and isn't able to determine that the job is done.

2) If for whatever reason the SparkSubmitTasks are able to run and the Task I've placed above finishes the Spark Job, the output task is never run and the marker-table is never created nor populated. However, the actual table is created in the Spark Job that's run. Am I misunderstanding how I'm supposed to call the RedshiftTarget?

In the meantime I'm trying to get acquainted with the source code.

Thanks!


Solution

  • Dropped the use of Luigi in my Spark applications because all my data is now streamed into S3 and I only need one large monolithic application to run all my Spark aggregations so I can take advantage of intermediate results/caching.