Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Aggregation (summation) of number of events from different Kafka topics


My application has three topics that receive some events belonging to users:

Event Type A -> Topic A
Event Type B -> Topic B
Event Type C -> Topic C

This would be an example of the flow of messages:

Message(user 1 - event A - 2020-01-03) 
Message(user 2 - event A - 2020-01-03) 
Message(user 1 - event C - 2020-01-20)
Message(user 1 - event B - 2020-01-22)

I want to be able to generate reports with the total number of events per user per month, aggregating all the events from the three topics, something like:

User 1 - 2020-01 -> 3 total events
User 2 - 2020-01 -> 1 total events

Having three KStreams (one per topic), how can I perform this addition per month to have the summation of all the events from three different topics? Can you show the code for this?


Solution

  • Because you are only interested in counting, the simplest way would be to just keep the user-id as key, and some dummy value for each KStream, merge all three streams and do a windowed-count afterwards (note that calendar based windows are not supported out-of-the-box; you could use a 31 day window as an approximation or build your own customized windows):

    // just map to dummy empty string (note, that `null` would not work
    KStream<UserId, String> streamA = builder.stream("topic-A").mapValues(v -> "");
    KStream<UserId, String> streamB = builder.stream("topic-B").mapValues(v -> "");
    KStream<UserId, String> streamC = builder.stream("topic-C").mapValues(v -> "");
    
    streamA.merge(streamB).merge(streamC).groupByKey().windowBy(...).count();
    

    You might also be interested in the suppress() operator.