Search code examples

Spark - Map flat dataframe to a configurable nested json schema

I have a flat dataframe with 5-6 columns. I want to nest them and convert it into a nested dataframe so that I can then write it to parquet format.

However, I don't want to use case classes as I am trying to keep the code as configurable as possible. I'm stuck with this part and need some help.

My Input :

ID ID-2 Count(apple) Count(banana) Count(potato) Count(Onion)

 1  23    1             0             2             0

 2  23    0             1             0             1

 2  29    1             0             1             0

My Output :

ROW 1 :

  "id": 1,
  "ID-2": 23,
  "fruits": {
    "count of apple": 1,
    "count of banana": 0
  "vegetables": {
    "count of potato": 2,
    "count of onion": 0

I have tried using the "map" function in the spark dataframe where I map my values to a case class. However, I will be playing around with the name of the fields and might change them too.

I dont want to maintain a case class and map rows to the sql column names as that would involve code changes everytime.

I was thinking of maintaining a Hashmap with the column names I want to keep with the column names of the dataframe. For instance, in the example, I'm mapping "Count(apple)" to "count of apple". However, I can't think of a nice easy way to pass my schema as a config and then map it in my code


  • Here is one approach using the scala Map type to create column mappings using the following dataset:

    val data = Seq(
    (1, 23, 1, 0, 2, 0),
    (2, 23, 0, 1, 0, 1),
    (2, 29, 1, 0, 1, 0)).toDF("ID", "ID-2", "count(apple)", "count(banana)", "count(potato)", "count(onion)")

    First we declare the mappings using scala.collection.immutable.Map collection and the function which is responsible for the mapping:

    import org.apache.spark.sql.{Column, DataFrame}
    val colMapping = Map(
            "count(banana)" -> "no of banana", 
            "count(apple)" -> "no of apples", 
            "count(potato)" -> "no of potatos", 
            "count(onion)" -> "no of onions")
    def mapColumns(colsMapping: Map[String, String], df: DataFrame) : DataFrame = {
           val mapping = df
             .map{ c => if (colsMapping.contains(c)) df(c).alias(colsMapping(c)) else df(c)}

    The function iterates though the columns of the given dataframe and identify the columns that have common keys with the mapping. Then it returns the column changing its name (with alias) according to the applied mappings.

    Output of mapColumns(colMapping, df).show(false):

    |ID |ID-2|no of apples|no of banana|no of potatos|no of onions|
    |1  |23  |1           |0           |2            |0           |
    |2  |23  |0           |1           |0            |1           |
    |2  |29  |1           |0           |1            |0           |

    Finally we generate fruits and vegetables via struct type:

    df1.withColumn("fruits", struct(col(colMapping("count(banana)")), col(colMapping("count(apple)"))))
    .withColumn("vegetables", struct(col(colMapping("count(potato)")), col(colMapping("count(onion)"))))

    Note that we drop all the cols of the colMapping collection after finishing with the transformations.


    |value                                                                                                            |
    |{"ID":1,"ID-2":23,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":2,"no of onions":0}}|
    |{"ID":2,"ID-2":23,"fruits":{"no of banana":1,"no of apples":0},"vegetables":{"no of potatos":0,"no of onions":1}}|
    |{"ID":2,"ID-2":29,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":1,"no of onions":0}}|