Search code examples
apache-kafka-streams

KStreams Grouping by multiple fields to get count


So I have a bunch of records in a topic like the one below. I can create the GroupBy in KSQLDB with no problem as it is more SQL than anything else. But I have been tasked to move it over to Java KStreams and am failing miserably.

Can someone guide me on the Topology for first grouping by user_id then Object_id then by day? I don't ask this lightly as I have tried over and over with state stores with so many examples but I am just chasing my tail. Basically, I would like to know how many times a user looked at a specific object on a given day.

Anything on how to accomplish this would be greatly appreciated.

{
    "entrytimestamp": "2020-05-04T15:21:01.897",
    "user_id": "080db36a-f205-4e32-a324-cc375b75d167",
    "object_id": "fdb084f7-5367-4776-a5ae-a10d6e898d22"
}

Solution

  • You can create composed key, and then group by key, like:

    KStream<String, Message> stream = builder.stream(MESSAGES, Consumed.with(Serdes.String(), jsonSerde));
    KStream<String, Message> newKeyStream = stream.selectKey((key, message) ->
                String.format("%s-%s-%s",
                        message.userId(),
                        message.objectId(),
                        LocalDate.ofInstant(Instant.ofEpochMilli(message.timestamp()), ZoneId.systemDefault())));
    KGroupedStream<String, Message> groupedBy = newKeyStream.groupByKey();