Search code examples
scalaapache-sparkapache-spark-datasettype-parameter

How to create Dataset with case class Type Parameter ? (Unable to find encoder for type T)


I'm trying to create a Dataset from a RDD of type T, which is known to be a case class, passed as parameter of my function. Problem is, implicits Encoders do not apply here. How should I set my type parameter to be able to create a Dataset ?

I've tried to set T as T: ClassTag or use implicit ClassTag but it didn't help. If I use this code providing the Type, it works, so no problem with the specific class type I want to pass (basic case class).

In my use case, I do other things in the function but here is the basic problem.

def createDatasetFromRDD[T](rdd: RDD[T])(implicit tag: ClassTag[T]): Dataset[T] = {
  // Convert RDD to Dataset
  sparkSession.createDataset(rdd)
}

I get the error message :

error: Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset.
 Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

Any help or suggestion ?

EDIT :

T is known to be a case class. I know case classes can use the product Encoder, so I basically want to let scala know it can use this one. Kryo sounds good but does not provide advantages of product Encoder.


Solution

  • I searched and found the solution without using Kryo when you know Product Encoder should be enough.

    TLDR

    def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T] = {
      // Convert RDD to Dataset
      sparkSession.createDataset(rdd)(Encoders.product[T])
    }
    

    Explanation :

    Kryo has some disadvantages that are explained here. Instead, why not use the Product encoder, that is actually the one that spark uses for case classes ?

    So if I go :

      sparkSession.createDataset(rdd)(Encoders.product[T])
    

    I get error type arguments [T] do not conform to method product's type parameter bounds [T <: Product]. Alright then, let's mention Product :

    def createDatasetFromRDD[T <: Product](rdd: RDD[T]): Dataset[T]
    

    Now I got No TypeTag available for T. That's ok, let's put a TypeTag !

    def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T]
    

    And that's it ! Now you can provide a case class type to this function and the product Encoder will be used without any other code needed. In case your class doesn't apply to [T <: Product] then you may want to look into kode's answer.

    EDIT

    As commented by Luis Miguel Mejía Suárez, another solution is to provide an encoder like this:

    def createDatasetFromRDD[T : Encoder](rdd: RDD[T]): Dataset[T]
    

    and the caller is the one with the responsability of having the encoder on the implicit scope, which if will be a case class, a simple import spark.implicits._ will be enough. And if not, then the user is the one that has to provide the kryo encoder.