Search code examples
scalaapache-sparkschema

How to get the spark case class schema and UDFs with missing TypeTag?


import org.apache.spark.sql.Encoders
import org.apache.spark
import scala.reflect.runtime.universe._

case class StationX(  stnId: Int,
                    wbanId: Int,
                    lat: Double,
                    lon: Double)
object Inst {
  def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema
  val ss = getSchema[StationX]
}

The above produces an error: No TypeTag available for observatory.StationX at val ss = getSchema[StationX]

How can this code be corrected?

This works in the spark shell REPL but not in my project.

EDITS Deleted and replaced with a complete way to reproduce.

A way to reproduce:

Download project

Test build of main, then add an object:

package observatory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Encoders
import scala.reflect.runtime.universe._

case class CC(i: Int)
object SparkInstance {
  val spark = SparkSession
    .builder()
    .appName("Spark SQL UDF scalar example")
    .getOrCreate()

 // ** These don't work (or maybe just the def works) **

  def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema

  // No TypeTag available for Double
  val random = udf(() => Math.random())

  // No TypeTag available for Int
  val plusOne = udf((x: Int) => x + 1)

  // No TypeTag available for observatory.CC
  val ss = getSchema[CC]
}

and build.

Uses : Scala 3.3.0, Spark 3.2 is excluded, to enable remove the excludes in build.sbt


Solution

  • With these in the build.sbt:

      ("org.apache.spark" %% "spark-sql" % "3.3.2").cross(CrossVersion.for3Use2_13),
      ("io.github.vincenzobaz" %% "spark-scala3-encoders" % "0.2.3"),
      ("io.github.vincenzobaz" %% "spark-scala3-udf" % "0.2.3"),
    

    and code wise:

    package observatory
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.Encoder
    import scala.reflect.runtime.universe._
    
    import scala3encoders.given
    
    import scala3udf.{Udf => udf}
    
    case class CC(i: Int)
    object SparkInstance {
      val spark = SparkSession
        .builder()
        .appName("Spark SQL UDF scalar example")
        .getOrCreate()
    
      def getSchema[T : Encoder] = implicitly[Encoder[T]].schema
    
      val random = udf(() => Math.random())
    
      val plusOne = udf((x: Int) => x + 1)
    
      val ss = getSchema[CC]
    }
    

    You were mixing mechanisms, the scala3 docs state you need to use it's udf to work with the types correctly. Similarly the getSchema function tried to create Encoders using the spark mechanism, which fails as it uses scala 2 reflection. Using an Encoder context instead lets the spark-scala3 Encoder derivation work.