My application has three topics that receive some events belonging to users:
Event Type A -> Topic A
Event Type B -> Topic B
Event Type C -> Topic C
This would be an example of the flow of messages:
Message(user 1 - event A - 2020-01-03)
Message(user 2 - event A - 2020-01-03)
Message(user 1 - event C - 2020-01-20)
Message(user 1 - event B - 2020-01-22)
I want to be able to generate reports with the total number of events per user per month, aggregating all the events from the three topics, something like:
User 1 - 2020-01 -> 3 total events
User 2 - 2020-01 -> 1 total events
Having three KStreams (one per topic), how can I perform this addition per month to have the summation of all the events from three different topics? Can you show the code for this?
Because you are only interested in counting, the simplest way would be to just keep the user-id as key, and some dummy value for each KStream
, merge all three streams and do a windowed-count afterwards (note that calendar based windows are not supported out-of-the-box; you could use a 31 day window as an approximation or build your own customized windows):
// just map to dummy empty string (note, that `null` would not work
KStream<UserId, String> streamA = builder.stream("topic-A").mapValues(v -> "");
KStream<UserId, String> streamB = builder.stream("topic-B").mapValues(v -> "");
KStream<UserId, String> streamC = builder.stream("topic-C").mapValues(v -> "");
streamA.merge(streamB).merge(streamC).groupByKey().windowBy(...).count();
You might also be interested in the suppress()
operator.