Search code examples
scalaapache-spark

Spark: RuntimeException: java.lang.String is not a valid external type for schema of date


This works fine:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType}
import org.apache.spark.sql.functions._

val df = Seq( ("2018-01-01", "2018-01-31", 80)
            , ("2018-01-07","2018-01-10", 10)
            , ("2018-01-07","2018-01-31", 10)
            , ("2018-01-11","2018-01-31", 5)
            , ("2018-01-25","2018-01-27", 5)
            , ("2018-02-02","2018-02-23", 100)
            ).toDF("sd","ed","coins")

val schema = List(("sd", "date"), ("ed", "date"), ("coins", "integer"))
val newColumns = schema.map(c => col(c._1).cast(c._2))
val newDF = df.select(newColumns:_*)
newDF.show(false)

and is a work-around for this below issue I present:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType};

val someData = Seq(
                     Row("2018-01-01","2018-01-31", 80)
                   , Row("2018-01-07","2018-01-10", 10)
                   , Row("2018-01-07","2018-01-31", 10)
                   , Row("2018-01-11","2018-01-31", 5)
                   , Row("2018-01-25","2018-01-27", 5)
                   , Row("2018-02-02","2018-02-23", 100)
                  )

val someSchema = List(
  StructField("sd", DateType, true),
  StructField("ed", DateType, true),
  StructField("coins", IntegerType, true),
)

val dfA = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

dfA.show(false)

generates an error as follows:

Caused by: RuntimeException: java.lang.String is not a valid external type for schema of date

I am aware of the int, bigint problem and indicate a workaround for date in the first snippet, but I cannot seem to use the Date Type - I would like to know how on the 2nd snippet, continuing in this vein.


Solution

  • After much experimentation, trying df.rdd.map{... and asIntanceOf[Date], no joy.

    My first approach as stated above is the way to go:

    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType}
    import org.apache.spark.sql.functions._
    
    val df = Seq( ("2018-01-01", "2018-01-31", 80)
                , ("2018-01-07","2018-01-10", 10)
                , ("2018-01-07","2018-01-31", 10)
                , ("2018-01-11","2018-01-31", 5)
                , ("2018-01-25","2018-01-27", 5)
                , ("2018-02-02","2018-02-23", 100)
                ).toDF("sd","ed","coins")
    
    val schema = List(("sd", "date"), ("ed", "date"), ("coins", "integer"))
    val newColumns = schema.map(c => col(c._1).cast(c._2))
    val newDF = df.select(newColumns:_*)
    newDF.show(false)