Search code examples
javaapache-kafka-streams

Complex aggregation


I have data in a topic that needs to be counted at multiple levels and all code and articles only mention the word count example.

An example of the data would be:

serial: 123 country: us date: 01/05/2018 state: new york city: nyc visitors: 5

serial: 123 country: us date: 01/06/2018 state: new york city: queens visitors: 10

serial: 456 date: 01/06/2018 country: us state: new york city: queens visitors: 27

serial: 123 date: 01/06/2018 country: us state: new york city: nyc visitors: 867

I have done the filter, groupBy but the aggregate ? Sorry for the Java 8 and & mix , i prefer 8 but learning it at the same time

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {

            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {

            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {

    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;

    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;

        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

For all equal serial_id (this would be the group by) count total number of visitors per this:

serial country state city total_num_visitors


Solution

  • If each record contributes to exactly one count, I would recommend to branch() the stream and count per sub-stream:

    KStream stream = builder.stream(...)
    KStream[] subStreams = stream.branch(...);
    
    // each record of `stream` will be contained in exactly _one_ `substream`
    subStream[0].grouByKey().count(); // or aggregate() instead of count()
    subStream[1].grouByKey().count();
    // ...
    

    If branching does not work, because a single record needs to go into multiple counts, you can "broadcast" and filter:

    KStream stream = builder.stream(...)
    
    // each record in `stream` will be "duplicated" and sent to all `filters`
    stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
    stream.filter(...).grouByKey().count();
    // ...
    

    Using the same KStream object multiple time and apply multiple operators (in our case filter(), each record will be "broadcasted" to all operators). Note, that record (ie, objects) are not physically copied for this case, but the same input record object is used to call each filter().