Search code examples
javascalaapache-kafkakafka-streams-scala

What is difference between found and required in following errors


While working on Scala Kafka KeyValueMapper implementation I am getting following error. I am not sure what exactly is the difference. Thanks for your help.

Code:

  1. I created a KTable from topic.

    val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2
    
    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)
    
    val ktwindow: KTable[Windowed[String], java.lang.Long] =
      ipandTime
        .groupByKey(Serdes.String(), Serdes.Long())
        .reduce(creducer, deduplicationWindow, "ktwindow-query")
    
  2. I am getting error while using selectKey method when I try to create stream with key of Windowed[String]. Similar implementation in java works fine.

    val fStream = ktwindow
      .toStream()
      .selectKey(
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
    
[error]  found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]

[error]  required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]

Solution

  • The variable ipandTime was nowhere to find, so I've replaced it by a ???, but that doesn't have anything to do with the actual problem.

    As I said, if type inference for Java use-site wildcards fails, then simply add explicit type arguments. This here compiles for Kafka 1.1.0:

    import org.apache.kafka.streams.kstream._
    import org.apache.kafka.common.serialization._
    import org.apache.kafka.streams.KeyValue
    
    
    object Q49594920 {
    
          val creducer: Reducer[java.lang.Long] =
          (v1, v2) => if (v1 > v2) v1 else v2
    
        val deduplicationWindow = TimeWindows
          .of(60000L * 10)
          .advanceBy(60000L)
          .until(60000L * 10)
    
        val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
          // ipandTime // What's that? It's not defined anywhere!
          //   .groupByKey(Serdes.String(), Serdes.Long())
          //   .reduce(creducer, deduplicationWindow, "ktwindow-query")
    
        val fStream = ktwindow
          .toStream()
          .selectKey[KeyValue[String, java.lang.Long]](
            new KeyValueMapper[Windowed[String],
                               java.lang.Long,
                               KeyValue[String, java.lang.Long]] {
              override def apply(
                  key: Windowed[String],
                  value: java.lang.Long): KeyValue[String, java.lang.Long] = {
                new KeyValue(key.key(), value)
              }
            }
          )
    }
    

    The selectKey method wanted a generic type argument KR, so I just gave the concrete type KeyValue[String, java.lang.Long] to it, then it just worked.