Search code examples
dataframescalaapache-sparkapache-spark-sql

How to convert flat df to a structure dataframe and then create a view?


This is my table structure:

customer_item(struct)
   customer_name(string)
   customer_detail(struct)
       bill_to_city(string)
       bill_to_country(string)
       bill_to_state(string)
       contact_phone(string)
       country_name(string)
       partner_name(string)

This is the dataframe I got, I want to use this df to create a view which can be used in the future. But the structure I got here is flat, I need to convert it to the above structure, otherwise the view I create in the future cannot be used.

+-------------+------------+---------------+-------------+-------------+-------------+------------+
|customer_name|bill_to_city|bill_to_country|bill_to_state|contact_phone|country_name |partner_name|
+-------------+------------+---------------+-------------+-------------+-------------+------------+
|abc          |Arlington   |US             |Texas        |123.456.7890 |United States|name        |
+-------------+------------+---------------+-------------+-------------+-------------+------------+

This is my code, got error

Invalid call to qualifier on unresolved object, tree: 'customer_item'

How can I fix the issue?

val selectedDf = df.select(col("customer_detail")).select("customer_detail.*")

selectedDf.show(false)

val tempViewName = "tempStageTransView"

selectedDf.createOrReplaceTempView(tempViewName)
executeInsertIntoBookingTransactionQuery(spark).show()

private def executeInsertIntoTable(sparkSession: SparkSession) = {
    sparkSession.sql(raw"""
        INSERT OVERWRITE TABLE ${myTable}
        SELECT
            null,
            (customer detail here)
            null,
            null,
            null,
            null
        FROM tempStageTransView
    """)
}

Solution

  • You can create a struct by using the struct function from org.apache.spark.sql.functions.

    For example like this (note that I'm using struct twice, since you have two nested structs in your schema):

    import org.apache.spark.sql.functions.{col, struct}
    
    val df = Seq(
      ("abc","Arlington","US","Texas","123.456.7890","United States","name")
    ).toDF("customer_name","bill_to_city","bill_to_country","bill_to_state","contact_phone","country_name","partner_name")
    
    val output = df.select(
      struct(
        col("customer_name"),
        struct(
          "bill_to_city",
          "bill_to_country",
          "bill_to_state",
          "contact_phone",
          "country_name",
          "partner_name").as("customer_detail")
        ).as("customer_item")
      )
    
    scala> output.printSchema
    root
     |-- customer_item: struct (nullable = false)
     |    |-- customer_name: string (nullable = true)
     |    |-- customer_detail: struct (nullable = false)
     |    |    |-- bill_to_city: string (nullable = true)
     |    |    |-- bill_to_country: string (nullable = true)
     |    |    |-- bill_to_state: string (nullable = true)
     |    |    |-- contact_phone: string (nullable = true)
     |    |    |-- country_name: string (nullable = true)
     |    |    |-- partner_name: string (nullable = true)
    

    With the .as method you can rename a column, which enables you to have the customer_detail and customer_item column names.