Search code examples
scalaapache-sparkdatasettype-safety

Write transformation in typesafe way


How do I write the below code in typesafe manner in spark scala with Dataset Api:

val schema: StructType = Encoders.product[CaseClass].schema
//read json from a file
val readAsDataSet :CaseClass=sparkSession.read.option("mode",mode).schema(schema).json(path)as[CaseClass]

//below code needs to be written in type safe way:
val someDF= readAsDataSet.withColumn("col1",explode(col("col_to_be_exploded")))
      .select(from_unixtime(col("timestamp").divide(1000))
        .as("date"), col("col1"))

Solution

  • As someone in the comments said, you can create a Dataset[CaseClass] and do your operations on there. Let's set it up:

    import spark.implicits._
    
    case class MyTest (timestamp: Long, col_explode: Seq[String])
    
    val df = Seq(
      MyTest(1673850366000L, Seq("some", "strings", "here")),
      MyTest(1271850365998L, Seq("pasta", "with", "cream")),
      MyTest(611850366000L, Seq("tasty", "food"))
    ).toDF("timestamp", "col_explode").as[MyTest]
    
    
    df.show(false)
    +-------------+---------------------+
    |timestamp    |col_explode          |
    +-------------+---------------------+
    |1673850366000|[some, strings, here]|
    |1271850365998|[pasta, with, cream] |
    |611850366000 |[tasty, food]        |
    +-------------+---------------------+
    

    Typically, you can do many operations with the map function and the Scala language.

    A map function returns the same amount of elements as the input has. The explode function that you're using, however, does not return the same amount of elements. You can implement this behaviour using the flatMap function.

    So, using the Scala language and the flatMap function together, you can do something like this:

    import java.time.LocalDateTime
    import java.time.ZoneOffset
    
    case class Exploded (datetime: String, exploded: String)
    
    val output = df.flatMap{ case MyTest(timestamp, col_explode) =>
      col_explode.map( value => {
          val date = LocalDateTime.ofEpochSecond(timestamp/1000, 0, ZoneOffset.UTC).toString
          Exploded(date, value)
        }
      )
    }
    
    output.show(false)
    +-------------------+--------+
    |datetime           |exploded|
    +-------------------+--------+
    |2023-01-16T06:26:06|some    |
    |2023-01-16T06:26:06|strings |
    |2023-01-16T06:26:06|here    |
    |2010-04-21T11:46:05|pasta   |
    |2010-04-21T11:46:05|with    |
    |2010-04-21T11:46:05|cream   |
    |1989-05-22T14:26:06|tasty   |
    |1989-05-22T14:26:06|food    |
    +-------------------+--------+
    

    As you see, we've created a second case class called Exploded which we use to type our output dataset. Our output dataset has the following type: org.apache.spark.sql.Dataset[Exploded] so everything is completely type safe.