I'm new to the Apache Kafka. I read the code of a steam application and stumbled upon the aggregation operation. I try to understand it on my own and I need the confirmation if I'm being correct to interpret.
The code snippet to read from the topic and aggregation is provided below,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the grouping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
The stream of data to the bank-transactions
topic is produced as follows,
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
The initial balance is initiated as follows,
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
The newBalance
method takes a transaction and balance and returns the new balance,
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
I have 2 questions about the grouping and aggregation,
a. Is the groupByKey
is grouping by the Serdes.String()
and the jsonSerde
is only performing the serialization and deserialization for the steam data? The Serdes.String()
is a name String in the newRandomTransaction
method.
b. My assertion is the key, transaction
inside the aggregation
function of the line (key, transaction, balance) -> newBalance(transaction, balance)
is read from the bank-transactions
topic and the balance
is coming from the initialBalance
from the previous line. Is that correct?
I also puzzled while trying to debug the app though it runs seamlessly.
Is the groupByKey is grouping by the Serdes.String() and the jsonSerde is only performing the serialization and deserialization for the steam data?
Yes, groupByKey is grouping by the keys, which are able to be deserialized and compared as strings
My assertion is the key, transaction inside the aggregation function of the line (key, transaction, balance) -> newBalance(transaction, balance) is read from the bank-transactions topic and the balance is coming from the initialBalance from the previous line
Almost. The initializer is on the first parameter, yes, but the aggregated result is carried forward throughout the entire execution of the application, endlessly aggregating.
In other words, you start with initialBalance
always, then for every key that's the same, you add that transaction
's balance to the currently accumulated balance
for that key. If you have not yet seen the key repeated, only then will it be added to the initial balance
And yes, your input topic was specified by the KStreams builder.stream
method