Search code examples
scalaapache-flinkflink-streaming

How to get rid of : class type mismatch


I hope to process various generic data sources accessed by Kafka, so I developed the following code:

  def accessKafkaSource[T: ClassTag](sEnv: StreamExecutionEnvironment): DataStream[T] = {
    val kafkaSource: KafkaSource[T] = KafkaSource.builder()
      .setBootstrapServers("")
      .setGroupId("")
      .setTopics("test")
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
      .setValueOnlyDeserializer(new AbstractDeserializationSchema[T]() {
        override def deserialize(msg: Array[Byte]): T = {
           // JSONUtil.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), classOf[T])
           JSONUtil.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), classTag[T].runtimeClass)
        }
      })
      .build()

Since the commented out code will get an error: class type required but t found, I modified the code, but caused a new problem: type mismatch; found : _$ 1 where type _$ 1 required: T。 How should my needs be realized?


Solution

  • As AminMal notes, runtimeClass is not guaranteed to return the class object of T, just what it erases to in the runtime. AnyVals in particular will break this.

    If everything you wish to deserialize is an AnyRef (this is likely the case), you can often safely cast the result of runtimeClass:

    def kindaSafeClass[T <: AnyRef : ClassTag]: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
    

    The situation this would be unsafe is when generics are involved (erasure...), as can be seen by

    val clazz = kindaSafeClass[List[String]]
    val lst = List(1)
    
    val cast =
      if (clazz.isInstance(lst)) {
        println(s"$lst is an instance of $clazz")
        clazz.cast(lst)
      } else ???
    
    println(cast)
    println(cast.head.isEmpty)
    

    which will print List(1) is an instance of class scala.collection.immutable.List, then List(1), and then blow up with a ClassCastException when we try to cast 1 to a String.

    But if your T will always be an AnyRef and you can be sure that it's not generic, you can

    // Note: T must not be generic (e.g. List[String])
    def accessKafkaSource[T <: AnyRef : ClassTag](sEnv: StreamExecutionEnvironment): DataStream[T] =
      // as before until...
      JSONUtils.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), kindaSafeClass[T])
      // as before...