Search code examples
apache-kafkaapache-kafka-streams

Kafka Stream aggregation by two fields


I'm using kafka stream to create aggregation (sum) for item in warehouse.
The item can be added (e.g. bought from supplier) or removed (e.g item sold).
In the application, one warehouse can serve several stores, and company has several warehouses.

In this case, I need to sum and group the transaction using two fields : item name, and store name.

Summing using only item name (one field) is straightforward, but how to use additional grouping (e.g. total inventory per item for each store) or (total inventory per warehouse, per item name)?

My (oversimplified) code

InventoryKafkaMessage.java

public class InventoryKafkaMessage {

    private String warehouseId;  // warehouse ID
    private String itemName;  // item name
    private long quantity;  // always positive
    private String type;  // ADD or REMOVE
    private String storeLocation; // store ID
    private long transactionTimestamp;
    // ... some others, but not relevant for this question
}

Message sent to source topic using item name as key.

InventoryAggregatorStream.java
The stream is

        var inventorySerde = new JsonSerde<>(InventoryKafkaMessage.class);
        var sourceStream = builder.stream("supplychain-warehouse-inventory", Consumed.with(Serdes.String(), inventorySerde));

        // aggregating by key (item name)
        logisticStream.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
                .groupByKey()
                .aggregate(() -> 0l, (aggKey, newValue, aggValue) -> aggValue + newValue,
                        Materialized.with(Serdes.String(), Serdes.Long()))
                .toStream().to("stream-supplychain-wharehouse-inventory-total", Produced.with(Serdes.String(), Serdes.Long()));


Solution

  • To group on multiple attributes, you can define a composed type that holds both attributes and set it as key. For example, you can define a type:

    public class GroupingKey {
      private String warehouseId;
      private String itemName;
    
      public GroupingKey(String warehouseId, String itemName) {
        // set fields
      }
      // etc
    }
    
    // usage:
    
    sourceStream = builder.stream("supplychain-warehouse-inventory",
                                  Consumed.with(Serdes.String(), inventorySerde));
    newKeyStream = sourceStream.selectKey((k, v) -> new GroupingKey(v.warehouseId, v.itemName));
    
    newKeyStream.groupByKey()...