Search code examples
scalaapache-sparktypesafe-config

How to use TypeSafe config with Apache Spark?


I have a Spark application which I am trying to package as a fat jar and deploy to the local cluster with spark-submit. I am using Typesafe config to create config files for various deployment environments - local.conf, staging.conf, and production.conf - and trying to submit my jar.

The command I am running is the following:

/opt/spark-3.0.1-bin-hadoop2.7/bin/spark-submit \
--master spark://127.0.0.1:7077 \
--files ../files/local.conf \
--driver-java-options '-Dconfig.file=local.conf' \
target/scala-2.12/spark-starter-2.jar  

I built the command incrementally by adding options one after another. With --files, logs suggest that the file is being uploaded to Spark but when I add --driver-java-options, submitting fails with file not being found.

Caused by: java.io.FileNotFoundException: local.conf (No such file or directory)
        at java.base/java.io.FileInputStream.open0(Native Method)
        at java.base/java.io.FileInputStream.open(FileInputStream.java:219)
        at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
        at com.typesafe.config.impl.Parseable$ParseableFile.reader(Parseable.java:629)
        at com.typesafe.config.impl.Parseable.reader(Parseable.java:99)
        at com.typesafe.config.impl.Parseable.rawParseValue(Parseable.java:233)
        at com.typesafe.config.impl.Parseable.parseValue(Parseable.java:180)
        ... 35 more

Code:

import com.example.spark.settings.Settings
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession

object App extends App {
  val config = ConfigFactory.load()
  val settings = Settings(config = config)

  val spark = SparkSession
    .builder()
    .getOrCreate()

  spark.stop()
}

What do I need to change so that I can provide config files separately?


Solution

  • According to Spark Docs, --files are placed in the working directory of each executor. While you're trying to access this file from driver, not executor.

    In order to load config on driver side, try something like this:

    /opt/spark-3.0.1-bin-hadoop2.7/bin/spark-submit \
    --master spark://127.0.0.1:7077 \
    --driver-java-options '-Dconfig.file=../files/local.conf' \
    target/scala-2.12/spark-starter-2.jar  
    

    If what you want is to load config on executor side, you need to use spark.executor.extraJavaOptions property. In this case you need to load the config inside lambda that runs on executor, example for RDD API:

    
    myRdd.map { row => 
      val config = ConfigFactory.load()
      ...
    }
    

    Visibility of the config will be limited to the scope of the lambda. This is a quite complicated way, and I'll describe a better option below.

    My general recommendation on how to work with custom configs in Spark:

    1. Read this chapter of Spark Docs
    2. Load the config on driver side
    3. Map settings that you need to immutable case class
    4. Pass this case class to executors via closures
    5. Keep in mind that case class with settings should contain as less data as possible, any field types should be either primitive or implement java.io.Serializable

    EMR specific is that it's hard to get to the driver's filesystem. So it's preferable to store the config in the external storage, typically S3.

    Typesafe config lib is not capable to load files directly from S3, so you can pass a path to the config as an app argument, not as -Dproperty, read it from S3 using AmazonS3Client and then load it as config using ConfigFactory.parseString(). See this answer as an example.