Search code examples
javajsonapache-kafkaapache-kafka-streams

How does this aggregation works in the Kafka stream?


I'm new to the Apache Kafka. I read the code of a steam application and stumbled upon the aggregation operation. I try to understand it on my own and I need the confirmation if I'm being correct to interpret.

The code snippet to read from the topic and aggregation is provided below,

// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


KStreamBuilder builder = new KStreamBuilder();

// read from the topic 'bank-transactions' as `KStream`. I provided the producer below 
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

// we define the grouping and aggregation here 
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
    .aggregate(
            () -> initialBalance,
            (key, transaction, balance) -> newBalance(transaction, balance),
            jsonSerde,
            "bank-balance-agg"
    );

The stream of data to the bank-transactions topic is produced as follows,

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time using Java 8
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());

    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}

The initial balance is initiated as follows,

// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();

initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

The newBalance method takes a transaction and balance and returns the new balance,

private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
    // create a new balance json object
    ObjectNode newBalance = JsonNodeFactory.instance.objectNode();

    newBalance.put("count", balance.get("count").asInt() + 1);
    newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

    Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
    Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();

    Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
    newBalance.put("time", newBalanceInstant.toString());

    return newBalance;
}

I have 2 questions about the grouping and aggregation,

a. Is the groupByKey is grouping by the Serdes.String() and the jsonSerde is only performing the serialization and deserialization for the steam data? The Serdes.String() is a name String in the newRandomTransaction method.

b. My assertion is the key, transaction inside the aggregation function of the line (key, transaction, balance) -> newBalance(transaction, balance) is read from the bank-transactions topic and the balance is coming from the initialBalance from the previous line. Is that correct?

I also puzzled while trying to debug the app though it runs seamlessly.


Solution

  • Is the groupByKey is grouping by the Serdes.String() and the jsonSerde is only performing the serialization and deserialization for the steam data?

    Yes, groupByKey is grouping by the keys, which are able to be deserialized and compared as strings

    My assertion is the key, transaction inside the aggregation function of the line (key, transaction, balance) -> newBalance(transaction, balance) is read from the bank-transactions topic and the balance is coming from the initialBalance from the previous line

    Almost. The initializer is on the first parameter, yes, but the aggregated result is carried forward throughout the entire execution of the application, endlessly aggregating.

    In other words, you start with initialBalance always, then for every key that's the same, you add that transaction's balance to the currently accumulated balance for that key. If you have not yet seen the key repeated, only then will it be added to the initial balance

    And yes, your input topic was specified by the KStreams builder.stream method