Search code examples
scalaapache-sparkapache-spark-sqlapache-spark-dataset

Spark DataFrame casting to Dataset Doesn't Update NullType Columns in Schema


I am creating a dataframe that gets initialized with some columns set to null. Before writing out, I typed the dataframe as a case class A. Given that we have a Dataset[A], i assumed that the underlying schema of the dataset would have the correct types; however, the schema is left as NullType. The following is an example of how to reproduce:

case class ExampleInput(
  name: String,
  age: Integer
)

case class ExampleOutput(
  name: String,
  age: Integer,
  filledLater: String
)

// Define Input
val inputDS: Dataset[ExampleInput] = Seq(ExampleInput("asdf", 1)).toDS

// Calculate Output
val outputDS: Dataset[ExampleOutput] = inputDS
  .withColumn("filledLater", lit(null))
  .as[ExampleOutput]

Expected Result Schema:

StructType(
  StructField("name", StringType, true),
  StructField("age", StringType, false),
  StructField("filledLater", StringType, true)
)

Observed Result Schema:

StructType(
  StructField("name", StringType, true),
  StructField("age", StringType, false),
  StructField("filledLater", NullType, true)
)

This can be solved by using .map(x => x: ExampleOutput), but this is less than desirable. Is there a better solution to automatically update the schema without manually casting the columns.


Solution

  • While it is impractical to achieve the exact thing what you are looking for, I recommend you either to

    1. generate a collection of field name to DataType (spark) mapping e.g. myTypes:Map[String, StructType] and you can simply cast to that type -
    inputDS
      .withColumn("filledLater", lit(null).cast(myTypes("filledLater")))
    

    In this way you don't have to use Dataset[T], rather use Dataframe

    Or

    1. Use some reflection API like "shapeless" to use similar casting as mentioned above and use your (configurable) case class..