I hope to process various generic data sources accessed by Kafka
, so I developed the following code:
def accessKafkaSource[T: ClassTag](sEnv: StreamExecutionEnvironment): DataStream[T] = {
val kafkaSource: KafkaSource[T] = KafkaSource.builder()
.setBootstrapServers("")
.setGroupId("")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new AbstractDeserializationSchema[T]() {
override def deserialize(msg: Array[Byte]): T = {
// JSONUtil.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), classOf[T])
JSONUtil.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), classTag[T].runtimeClass)
}
})
.build()
Since the commented out code will get an error: class type required but t found, I modified the code, but caused a new problem: type mismatch; found : _$ 1 where type _$ 1 required: T。 How should my needs be realized?
As AminMal notes, runtimeClass
is not guaranteed to return the class object of T
, just what it erases to in the runtime. AnyVal
s in particular will break this.
If everything you wish to deserialize is an AnyRef
(this is likely the case), you can often safely cast the result of runtimeClass
:
def kindaSafeClass[T <: AnyRef : ClassTag]: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
The situation this would be unsafe is when generics are involved (erasure...), as can be seen by
val clazz = kindaSafeClass[List[String]]
val lst = List(1)
val cast =
if (clazz.isInstance(lst)) {
println(s"$lst is an instance of $clazz")
clazz.cast(lst)
} else ???
println(cast)
println(cast.head.isEmpty)
which will print List(1) is an instance of class scala.collection.immutable.List
, then List(1)
, and then blow up with a ClassCastException
when we try to cast 1
to a String
.
But if your T
will always be an AnyRef
and you can be sure that it's not generic, you can
// Note: T must not be generic (e.g. List[String])
def accessKafkaSource[T <: AnyRef : ClassTag](sEnv: StreamExecutionEnvironment): DataStream[T] =
// as before until...
JSONUtils.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), kindaSafeClass[T])
// as before...