Search code examples
scalaapache-sparkapache-spark-sqlapache-spark-dataset

Why Spark DataSet loses all its schema and just returning byte[]?


I create my SparkSession and register kryo classes this way:

val sparkConf = new SparkConf()
    .setAppName("bd-dq-spark")
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrationRequired", "true")
    .set("spark.driver.host", "127.0.0.1")
    .registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession = 
   SparkSession.builder()
     .master("local[*]")
     .config(sparkConf)
     .getOrCreate()

I define my case class this way:

object Model {
  type Timestamp = Long
  case class HeatSensorEvent(
                              eventId: String,
                              sensorId: String,
                              deviceId: String,
                              eventType: EventType,
                              timestamp: Timestamp,
                              temperature: Double
                            )
  object EventType extends Enumeration {
    final type EventType = Value
    val TEMPERATURE_CHANGE: EventType.Value = Value
  }
}

I prepare my fake data this way:

  val heatSensorEventData = Seq(
    HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
    HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
    HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
    HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
  )

and my main is this:

def main(args: Array[String]): Unit = {
    implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
    implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
    val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
      .createDataset(heatSensorEventData).as[HeatSensorEvent]
    heatSensorEventDs.show
    heatSensorEventDs.printSchema()
}

But all I got is this:

+--------------------+
|               value|
+--------------------+
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
+--------------------+

root
 |-- value: binary (nullable = true)

My question is why I lose all the schema and I can't show the normal data? How can I fix this?


Solution

  • When using encoders with objects, the columns can be transformed into a single binary column, which makes it impossible to inspect the values with a dataset.show()

    See the approaches how to solve this, which was originated from this post (Unfortunately, this is an http link).

    Define your classes:

    type Timestamp = Long  
    object Events {
      sealed case class EventType(value: String)
      object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
      val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
    }
    
    case class HeatSensorEvent(
                                eventId: String,
                                sensorId: String,
                                deviceId: String,
                                eventType: Events.EventType,
                                timestamp: Timestamp,
                                temperature: Double
                              )
    

    Create your data:

    val heatSensorEventData = Seq(
      HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
      HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
      HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
      HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
    )
    

    Now you can see your dataset:

    val ds = heatSensorEventData.toDS()
    ds.show()
    

    Output:

    +-------+--------+--------+--------------------+----------+-----------+
    |eventId|sensorId|deviceId|           eventType| timestamp|temperature|
    +-------+--------+--------+--------------------+----------+-----------+
    |    123|      s1|      d1|[TEMPERATURE_CHANGE]|1619555389|      85.41|
    |    234|      s1|      d1|[TEMPERATURE_CHANGE]|1619555419|      60.41|
    |    345|      s1|      d1|[TEMPERATURE_CHANGE]|1619556389|      60.41|
    |    567|      s1|      d1|[TEMPERATURE_CHANGE]|1619557389|      50.41|
    +-------+--------+--------+--------------------+----------+-----------+
    ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]
    
    

    Using enums in spark has been requested, and closed without a fix. The advantage in this, is that you don't need to use custom encoders.