Search code examples
apache-sparkdataframeapache-spark-sqlrddspark-csv

Programmatically generate the schema AND the data for a dataframe in Apache Spark


I would like to dynamically generate a dataframe containing a header record for a report, so creating a dataframe from the value of the string below:

val headerDescs : String = "Name,Age,Location"

val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

However now I want to do the same for the data (which is in effect the same data i.e. the metadata).

I create an RDD :

val headerRDD = sc.parallelize(headerDescs.split(","))

I then intended to use createDataFrame to create it:

val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)

however that fails because createDataframe is expecting a RDD[Row], however my RDD is an array of strings - I can't find a way of converting my RDD to a Row RDD and then mapping the fields dynamically. Examples I've seen assume you know the number of columns beforehand, however I want the ability eventually to be able to change the columns without changing the code - having the columns in a file for example.

Code excerpt based on first answer:

val headerDescs : String = "Name,Age,Location"

// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))

val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()

Executing this Results in:

+--------+---+--------+

|    Name|Age|Location|

+--------+---+--------+

|    Name|

|     Age|

|Location|

+--------+---+-------

Solution

  • For converting RDD[Array[String]] to RDD[Row] you need to do following steps:

    import org.apache.spark.sql.Row

    val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
    
    scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
    headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))
    
    scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
    headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34
    
    scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
    headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]
    
    
    scala> headerDf.printSchema
    root
     |-- Name: string (nullable = true)
     |-- Age: string (nullable = true)
     |-- Location: string (nullable = true)
    
    
    
    scala> headerDf.show
    +----+---+--------+
    |Name|Age|Location|
    +----+---+--------+
    |Name|Age|Location|
    +----+---+--------+
    

    This would give you a RDD[Row]

    For reading through file

    val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
     
    val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)
    

    Using Spark-CSV package :

     val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .schema(headerSchema) // defining based on the custom schema
        .load("cars.csv")
    

    OR

    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .load("cars.csv")
    

    There are are various options also which you can explore in its documentation.