Search code examples
scalaapache-sparkapache-kafkacommand-line-argumentsscopt

How to parse a case class which has fields as another case class using "scopt" command line argument?


parsing fields(eg-master:String="") works but what if this was inside another case class like case class Job(SparkArgs) given below, and I need to parse JobArgs


case class SparkArgs(master: String)

val parser = new scopt.OptionParser[SparkArgs]("testing") {
    head("spark-example", "2.11")

    opt[String]('c', "master").required().valueName("spark-master").
      action((x, c) => c.copy(master = x)).
      text("Setting master is required")
  }

  parser.parse(args, SparkArgs()) match {

    case Some(config) =>
    // do stuff
      println(config.master)
    case None => // failed

  }
//I am able to parse master above by >> run --master=local[2]

//Now how to parse if there are case class as parameters instead of String and Int and also those fields needs to be parsed,say scopt.OptionParser[JobArgs]
//eg -

 case class JobArgs(sparkArgs: SparkArgs, kafkaArgs: KafkaArgs)
 case class KafkaArgs(
                        kafkaPORT: String="",
                        checkpointPath: String="src/main/resources/checkpoints"
                      )
  case class SparkArgs(master: String = "")


//I tried-
val parser = new scopt.OptionParser[JobArgs]("testing") {
    head("spark-example", "2.11")

   //Now how to parse all those fields which are master and kafkaPORT here
  }

//and run similarly as>> run --master=local[2] --kafkaPORT=localhost:9092 

Solution

  • How about something like:

    case class JobArgs(sparkArgs: SparkArgs = SparkArgs(), kafkaArgs: KafkaArgs = KafkaArgs())
    
    case class KafkaArgs(
                          kafkaPORT: String = "",
                          checkpointPath: String = "src/main/resources/checkpoints"
                        )
    
    case class SparkArgs(master: String = "")
    
    object StackoverFlow {
      def main(args: Array[String]): Unit = {
    
        val parser = new scopt.OptionParser[JobArgs]("testing") {
          head("spark-example", "2.11")
    
          opt[String]("master")
            .required()
            .valueName("spark-master")
            .action((master, c) => c.copy(sparkArgs = SparkArgs(master)))
            .text("All necessary benchmark topics get created.")
    
          opt[String]("kafkaPort")
            .required()
            .valueName("kafka-port")
            .action((kafkaPort, c) => c.copy(kafkaArgs = KafkaArgs(kafkaPort)))
        }
    
        parser.parse(args, JobArgs()) match {
          case Some(c) => println(c)
          case _ => sys.exit(1)
        }
      }
    }
    

    Executing it with the arguments --master FCM --kafkaPort 1965 leads to the following output:

    JobArgs(SparkArgs(FCM),KafkaArgs(1965,src/main/resources/checkpoints))
    

    In case you also want to pass the checkpointPath, you can achieve this via:

    case class JobArgs(sparkArgs: SparkArgs = SparkArgs(), kafkaArgs: KafkaArgs = KafkaArgs())
    
    case class KafkaArgs(
                          kafkaPORT: String = "",
                          checkpointPath: String = "src/main/resources/checkpoints"
                        )
    
    case class SparkArgs(master: String = "")
    
    object StackoverFlow {
      def main(args: Array[String]): Unit = {
    
        val parser = new scopt.OptionParser[JobArgs]("testing") {
          head("spark-example", "2.11")
    
          opt[String]("master")
            .required()
            .valueName("spark-master")
            .action((master, c) => c.copy(sparkArgs = SparkArgs(master)))
            .text("All necessary benchmark topics get created.")
    
          opt[Unit]("kafka")
            .action((kafkaPort, c) => c.copy(kafkaArgs = KafkaArgs()))
            .children(
              opt[String]('p', "port")
                .required()
                .action((x, c) => c.copy(kafkaArgs = c.kafkaArgs.copy(kafkaPORT = x))),
              opt[String]('c', "checkpointPath")
                .required()
                .action((x, c) => c.copy(kafkaArgs = c.kafkaArgs.copy(checkpointPath = x)))
            )
        }
    
        parser.parse(args, JobArgs()) match {
          case Some(c) => println(c)
          case _ => sys.exit(1)
        }
      }
    }
    

    Correspondingly, executing it with the following parameters:

    --master FC --kafka --port 1965 --checkpointPath Mag/de/burg

    leads to the following output: JobArgs(SparkArgs(FC),KafkaArgs(1965,Mag/de/burg))