Search code examples
scalaapache-sparkdistributed-computing

How can I perform ETL on a Spark Row and return it to a dataframe?


I'm currently using Scala Spark for some ETL and have a base dataframe that contains has the following schema

|-- round: string (nullable = true)
|-- Id : string (nullable = true)
|-- questions: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- tag: string (nullable = true)
|    |    |-- bonusQuestions: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- difficulty : string (nullable = true)
|    |    |-- answerOptions: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- followUpAnswers: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|-- school: string (nullable = true)

I only need to perform ETL on rows where the round type is primary (there are 2 types primary and secondary). However, I need both type of rows in my final table.

I'm stuck doing the ETL which should be according to - If tag is non-bonus, the bonusQuestions should be set to null and difficulty should be null.

I'm currently able to access most fields of the DF like val round = tr.getAs[String]("round")

Next, I'm able to get the questions array using

val questionsArray = tr.getAs[Seq[StructType]]("questions")

and can iterate using for (question <- questionsArray) {...}; However I cannot access struct fields like question.bonusQuestions or question.tagwhich returns an error

error: value tag is not a member of org.apache.spark.sql.types.StructType

Solution

  • Spark treats StructType as GenericRowWithSchema, more specific as Row. So instead of Seq[StructType] you have to use Seq[Row] as

    val questionsArray = tr.getAs[Seq[Row]]("questions")
    

    and in the loop for (question <- questionsArray) {...} you can get the data of Row as

    for (question <- questionsArray) {
        val tag = question.getAs[String]("tag")
        val bonusQuestions = question.getAs[Seq[String]]("bonusQuestions")
        val difficulty = question.getAs[String]("difficulty")
        val answerOptions = question.getAs[Seq[String]]("answerOptions")
        val followUpAnswers = question.getAs[Seq[String]]("followUpAnswers")
      }
    

    I hope the answer is helpful