I am doing window aggregation on a kafka stream.
It works fine and does correct aggregation.
here's the code in scala.
CallRecord
is a case class.
builder
.stream[String, String](input_topic)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.to(output_topic)
In output topic I see key something like this.
When I try to read this from KSQLDB when I create a stream on this topic I see values for rowkey like this 3w�H�@
I understand this is deserialization issue, but I would like to either be able to deserialize this directly in KSQL or make it as Long of millis while streaming to output_topic.
My understanding is this should be easily achieved but I think I miss some nuance here.
The solution I gave is the following. Apparently it was not very hard.
import io.circe.generic.auto._
import org.json4s._
import org.json4s.native.Serialization.write
builder
.stream[String, String](args.INPUT_TOPIC)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.selectKey((k, v) => {
s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
})
.to(args.OUTPUT_TOPIC)
selectKey
provides this possibility to change the key for grouping,
so before streaming to output topic I extract the timestamp from the key and make it as a string.