Search code examples
scaladataframeapache-sparkapache-spark-sqlapache-spark-dataset

I want to pass schema (metadata) as an argument from Spark dataframe/dataset (dataframe name as an argument)


I want to pass schema (metadata) as an argument from Spark dataframe/dataset.

I m using spark 2.x

Code: (Sample)

//Define metadata like below.

val df_emp_metadata = StructType(
  List(
    StructField("emp_id", StringType,true),
    StructField("emp_hier_dt",DateType,true),
    StructField("dept_id",IntegerType,true)
  ))

  
val df_dept_metadata = StructType(
  List(
    StructField("dept_id", IntegerType,true),
    StructField("dept_name",StringType,true)
  ))

I want to pass the df_emp_metadata/df_dept_metadata as an argument while I am executing the Spark-Submit and pass it as a variable in schema below.

val meta_Data = arg(0)  //(df_emp_metadata or df_dept_metadata from Spark-Submit)

val readFileIn = spark.sqlContext.read
    .format("csv")
    .schema($meta_Data)
    .load("data/source_file.csv")

Spark is not allowing to pass the dataframe name as an argument.

Please suggest if any other alternative ways to do this in Spark/Scala programming.


Solution

  • Simple if-else statement. You can select it by putting 1 and 2 or else.

    val argument = arg(0)
    
    val schema = if (argument == "1") df_emp_metadata else df_dept_metadata 
    
    val readFileIn = spark.sqlContext.read
        .format("csv")
        .schema(schema)
        .load("data/source_file.csv")