Search code examples
scalaamazon-web-servicesapache-sparktimer

How can my Apache Spark code emit periodic heartbeats?


I'm developing an Apache Spark job to run and I plan to deploy it as one stage in an AWS Step Function. Unfortunately the particular way that I wish to deploy it isn't directly supported by Step Functions at this time; however, Step Functions has an API for a generic Task that I can make use of. Essentially, once the task is started, it needs to periodically make a call to sendTaskHeartbeat and then on completion it needs to call sendTaskSuccess.

My Spark job is written in Scala, and I'm wondering what the best approach for running something on a timer is within the context of an Apache Spark job. I see from other answers that I could make use of java.util.concurrent or perhaps java.util.Timer, but I'm not sure how the threading would work specifically in a Spark context. Since Spark is already doing a lot to distribute my code across each node I'm not sure if there are some hidden considerations I need to be weary of (i.e. I don't really want more than one instance of my timer, I want to make sure it stops when the sparky parts of my code complete, etc.

Is it safe to use a regular Timer in a Spark job? If I did something like this:

val timer = new Timer()
val task = new TimerTask {
    /* sendTaskHeartbeat */
}
timer.schedule(task, 1000L, 1000L)

val myRDD = spark.read.parquet(pathToParquetFiles)
val transformedRDD = myRDD.map( /* business logic */ )
transformedRDD.saveAsHadoopDataset(config) andThen task.cancel

Would that be sufficient? Or is there a risk that this code would lose track of the task and timer objects by the time it reaches the andThen, due to the distribution across nodes?


Solution

  • I ended up making use of a combination of a java.util.Timer and a SparkListener. I instantiate the Timer on the onJobStart event (and only once, so if (TIMER == null) { /* instantiate */ }, because the onJobStart event seemingly can fire multiple times). And then I handle the completion activity on the onApplicationEnd event (which does only fire once). The reason I didn't use onApplicationStart was because it seemed like by the time I hooked in my listener to the Spark context, this event had already fired.