Search code examples
scalaapache-sparkreflectionscala-reflect

Load Dataset from Dynamically generated Case Class


What is Needed:

number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop

What has already been Done:

I have some case classes dynamically generated from schema of database tables as below:

object schema{
case class Users(name: String,
                 favorite_color: String,
                 favorite_numbers: Array[Int])

case class UserData(registration_dttm: Timestamp,
                    id: Int,
                    first_name: String,
                    last_name: String,
                    email: String,
                    gender: String,
                    ip_address: String,
                    cc: String,
                    country: String,
                    birthdate: String,
                    salary: Double,
                    title: String,
                    comments: String)
}

Then i have used them as dynamic type to read in Load[T] function in my Loader.scala as below:

import org.apache.spark.sql.{Dataset, Encoder, SparkSession}

class Load[T <: Product: Encoder](val tableName: String,
                                       val inputPath: String,
                                       val spark: SparkSession,
                                       val saveMode: String,
                                       val outputPath: String,
                                       val metadata: Boolean)
    extends Loader[T] {

  val fileSystemSourceInstance: FileSystem[T] =
    new FileSystem[T](inputPath, spark, saveMode, tableName)

  override def Load: Dataset[T] =
    fileSystemSourceInstance.provideData(metadata, outputPath).as[T]

}

Now, by using reflect.api I am able to get TypeTag for my case classes.

def stringToTypeTag[A](name: String): TypeTag[A] = {
    val c = Class.forName(name)
    val mirror = runtimeMirror(c.getClassLoader)
    val sym = mirror.staticClass(name)
    val tpe = sym.selfType
    TypeTag(mirror, new api.TypeCreator {
      def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =

        if (m eq mirror) tpe.asInstanceOf[U # Type]
        else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
    })
  }

So if i print now my case class type tag I got:

val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]

Problem:

Need to read these TypeTag or Dynamically generated case classes, to encode my datasets as below:

new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load 

This is giving me error : Cannot resolve symbol typetagDynamic

if used like this:

new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load 

This is giving me error : type arguments [T] do not conform to method product's type parameter bounds [T <: Product]


Solution

  • If you know a type schema.Users only at runtime try to replace

    new Load[schema.Users](tableName,inputPath,spark,
      saveMode,
      outputPath + tableName,
      metadata).Load
    

    with

    import scala.reflect.runtime
    import scala.reflect.runtime.universe._
    
    val currentMirror = runtime.currentMirror
    
    val loadType = typeOf[Load[_]]
    val classSymbol = loadType.typeSymbol.asClass
    val classMirror = currentMirror.reflectClass(classSymbol)
    val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
    val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
      
    import scala.tools.reflect.ToolBox
    val toolbox = ToolBox(currentMirror).mkToolBox()
    val encoderType = appliedType(
      typeOf[Encoder[_]].typeConstructor.typeSymbol,
      currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
    )
    val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
    val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
    
    constructorMirror(tableName,inputPath,spark,
      saveMode,
      outputPath + tableName,
      metadata, encoderInstance).asInstanceOf[Load[_]].Load
    

    scala.tools.reflect.ToolBoxError: implicit search has failed

    You need either:

    1. to define an instance of type class org.apache.spark.sql.Encoder for Users in its companion object (so that the instance will be in implicit scope)

      object Users {
        implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
      }
      

    or

    1. to import instances of Encoder for case classes via import spark.implicits._ but you need to import them not into current local scope but into toolbox-generated local scope, so in this case you should replace

      val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
      val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
      

      with

      val className = "com.example.datasources.fileSystemSource.schema.Users"
      val classType = currentMirror.staticClass(className).toType
      val encoderInstance = toolbox.eval(
        q"""import path.to.spark.implicits._
            import org.apache.spark.sql.Encoder
            implicitly[Encoder[$classType]]""")
      

    See the whole code: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00

    https://groups.google.com/g/scala-internals/c/ta-vbUT6JE8