I'm trying to create a Dataset from a RDD of type T
, which is known to be a case class, passed as parameter of my function. Problem is, implicits Encoders do not apply here. How should I set my type parameter to be able to create a Dataset ?
I've tried to set T
as T: ClassTag
or use implicit ClassTag
but it didn't help. If I use this code providing the Type, it works, so no problem with the specific class type I want to pass (basic case class).
In my use case, I do other things in the function but here is the basic problem.
def createDatasetFromRDD[T](rdd: RDD[T])(implicit tag: ClassTag[T]): Dataset[T] = {
// Convert RDD to Dataset
sparkSession.createDataset(rdd)
}
I get the error message :
error: Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Any help or suggestion ?
EDIT :
T is known to be a case class. I know case classes can use the product Encoder, so I basically want to let scala know it can use this one. Kryo sounds good but does not provide advantages of product Encoder.
I searched and found the solution without using Kryo when you know Product Encoder should be enough.
def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T] = {
// Convert RDD to Dataset
sparkSession.createDataset(rdd)(Encoders.product[T])
}
Kryo has some disadvantages that are explained here. Instead, why not use the Product encoder, that is actually the one that spark uses for case classes ?
So if I go :
sparkSession.createDataset(rdd)(Encoders.product[T])
I get error type arguments [T] do not conform to method product's type parameter bounds [T <: Product]
. Alright then, let's mention Product :
def createDatasetFromRDD[T <: Product](rdd: RDD[T]): Dataset[T]
Now I got No TypeTag available for T
. That's ok, let's put a TypeTag !
def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T]
And that's it ! Now you can provide a case class type to this function and the product Encoder will be used without any other code needed. In case your class doesn't apply to [T <: Product]
then you may want to look into kode's answer.
As commented by Luis Miguel Mejía Suárez, another solution is to provide an encoder like this:
def createDatasetFromRDD[T : Encoder](rdd: RDD[T]): Dataset[T]
and the caller is the one with the responsability of having the encoder on the implicit scope, which if will be a case class, a simple import spark.implicits._
will be enough. And if not, then the user is the one that has to provide the kryo encoder.