Search code examples
javaapache-kafkaapache-kafka-streams

Kafka streams handle batch data to reset aggregation


I have some data arriving in my kafka topic "datasource" with the following schema (simplified for demo here):

{ "deal" : -1, "location": "", "value": -1, "type": "init" }
{ "deal": 123456, "location": "Mars", "value": 100.0, "type": "batch" },
{ "deal" 123457, "location": "Earth", "value", 200.0, "type": "batch" },
{ "deal": -1, "location": "", "value", -1, "type": "commit" }

This data comes from a batch run, we takes all deals and recalculates their value. Think of it as a day-start process - at this point, here is a fresh set of data for all locations. At the moment the init and commit messages are not sent to the real topic, they are filtered out by the producer.

During the day, there are then updates as things change. This provides new data (in this example we can ignore overwriting data, as this would be handled by re-running the batch):

{ "deal": 123458, "location", "Mars", "value": 150.0, "type": "update" }

This data comes into the application as a KStream "positions".


Another topic "locations" has a list of possible locations. These are pulled into a java kafka-streams application as a KGlobalTable locations:

{ "id": 1, "name": "Mars" },
{ "id": 2, "name": "Earth"}

The plan is to use a java 9 kafka-streams application to aggregate these values, grouped by location. The output should look something like:

{ "id": 1, "location": "Earth", "sum": 250.0 },
{ "id": 2, "location": "Mars": "sum": 200.0 }

This I what I have working so far:

StreamsBuilder builder = new StreamsBuilder();

/** snip creating serdes, settings up stores, boilerplate  **/

final GlobalKTable<Integer, Location> locations = builder.globalTable(
                LOCATIONS_TOPIC, 
                /* serdes, materialized, etc */
                );

final KStream<Integer, PositionValue> positions = builder.stream(
                POSITIONS_TOPIC,
                /* serdes, materialized, etc */
            );

/* The real thing is more than just a name, so a transformer is used to match locations to position values, and filter ones that we don't care about */
KStream<Location, PositionValue> joined = positions
                .transform(() -> new LocationTransformer(), POSITION_STORE) 
                .peek((location, positionValue) -> { 
                    LOG.debugv("Processed position {0} against location {1}", positionValue, location);
                });

/** This is where it is grouped and aggregated here **/
joined.groupByKey(Grouped.with(locationSerde, positionValueSerde))
            .aggregate(Aggregation::new, /* initializer */
                       (location, positionValue, aggregation) -> aggregation.updateFrom(location, positionValue), /* adder */
                Materialized.<Location, Aggregation>as(aggrStoreSupplier)
                    .withKeySerde(locationSerde)
                    .withValueSerde(aggregationSerde)
            );

Topology topo = builder.build();

The problem I have is that this is aggregating everything - so the daily batch, plus updates, then next daily batch, all get added. Basically, I need a way to say "here is the next set of batch data, reset against this". I do not know how to do this - help please!

Thanks


Solution

  • So if I understood you correctly, you want to aggregate the data, but only for the last day, and discard the rest.

    I suggest that you aggregate into an intermediary class that contains all the values in the stream, and also has logic for filtering away the data for the other days. If I understood you correctly, that would be discarding all the data before the last one of type "batch".

    Although in Kotlin, I have done a similar solution that you can look at if you need.