While working on Scala Kafka KeyValueMapper
implementation I am getting following error. I am not sure what exactly is the difference.
Thanks for your help.
Code:
I created a KTable
from topic.
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] =
ipandTime
.groupByKey(Serdes.String(), Serdes.Long())
.reduce(creducer, deduplicationWindow, "ktwindow-query")
I am getting error while using selectKey method when I try to create stream with key of Windowed[String]
. Similar implementation in java works fine.
val fStream = ktwindow
.toStream()
.selectKey(
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
The variable ipandTime
was nowhere to find, so I've replaced it by a ???
, but that doesn't have anything to do with the actual problem.
As I said, if type inference for Java use-site wildcards fails, then simply add explicit type arguments. This here compiles for Kafka 1.1.0:
import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue
object Q49594920 {
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
// ipandTime // What's that? It's not defined anywhere!
// .groupByKey(Serdes.String(), Serdes.Long())
// .reduce(creducer, deduplicationWindow, "ktwindow-query")
val fStream = ktwindow
.toStream()
.selectKey[KeyValue[String, java.lang.Long]](
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
}
The selectKey
method wanted a generic type argument KR
, so I just gave the concrete type KeyValue[String, java.lang.Long]
to it, then it just worked.