Search code examples
apache-kafkaapache-kafka-streamsconfluent-platformksqldb

Kafka Streams Windowed Key to Human Readable


I am doing window aggregation on a kafka stream. It works fine and does correct aggregation. here's the code in scala. CallRecord is a case class.

    builder
  .stream[String, String](input_topic)
  .mapValues((elm) => {
    parse(elm).extract[CallRecord]
  })
  .groupBy((key, value) => {
    value.agentId
  })
  .windowedBy(every15Minute)
  .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
    CallRecordAggByAgent(
      callRecord.agentId,
      ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
      ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
      ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
      aggregator.count + 1
    )
  })
  .mapValues((elm) => {
    write(elm)
  })
  .toStream
  .to(output_topic)

In output topic I see key something like this. output_topic key

When I try to read this from KSQLDB when I create a stream on this topic I see values for rowkey like this 3w�H�@ I understand this is deserialization issue, but I would like to either be able to deserialize this directly in KSQL or make it as Long of millis while streaming to output_topic. My understanding is this should be easily achieved but I think I miss some nuance here.


Solution

  • The solution I gave is the following. Apparently it was not very hard.

    import io.circe.generic.auto._
    import org.json4s._
    import org.json4s.native.Serialization.write
    
    builder
          .stream[String, String](args.INPUT_TOPIC)
          .mapValues((elm) => {
            parse(elm).extract[CallRecord]
          })
          .groupBy((key, value) => {
            value.agentId
          })
          .windowedBy(every15Minute)
          .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
            CallRecordAggByAgent(
              callRecord.agentId,
              ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
              ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
              ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
              aggregator.count + 1
            )
          })
          .mapValues((elm) => {
            write(elm)
          })
          .toStream
          .selectKey((k, v) => {
            s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
          })
          .to(args.OUTPUT_TOPIC)
    

    selectKey provides this possibility to change the key for grouping, so before streaming to output topic I extract the timestamp from the key and make it as a string.