I need to merge the payload if the reference number(refNo) is the same in different messages. My limitation is that I can only use a KTable
and if the key is an even number I don't need to merge the payload. Additionally, the order of incoming messages should not change the result.
For example, if we have an empty topic and incoming messages are:
1: { key: "1", value: {refNo:1, payload:{data1}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} } // this one should be not effected and left how it is
Expected result:
1: { key: "1", value: {refNo:1, payload:{data1, data2}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} }
The only way I can think of to do this is to use two times .groupBy
and join with the original topic everything again.
refNo
, save the key to the value itself, and join the payload during aggregation..groupBy
revert key to the initial state.I'm pretty sure there's an easier way to do this. What is the most optimized and elegant way to solve this issue?
Edit: Its downstream and there is output topic, original is not edited.
At the moment I'm going with this solution. It works, but I have no idea how it will perform, or can it be more optimized, or if there is better way to solve my issue.
KStream<String, Value> even = inputTopicStream.filter((key, value) -> value.isEven()));
inputTopicStream.toTable(Materialized.with(String.serdes, Value.serde))
.groupBy(
(key, value) -> KeyValue.pair(new Key(value.getRefNo(), addKeyToValue(key, value)),
Grouped.with("aggregation-internal", String.serdes, Value.serde))
.aggregate(
Value::new,
(key, value, agg) -> mergePayload(key, value, agg), // ensure that key is uneven after merge
(key, value, agg) -> handleSplit(key, value, agg))
.toStream()
.selectKey((key, value) -> new Key(value.getKey())) // restore original key
.merge(even) // need to merge even key stream, because they was lost during aggregation
.to(OUTPUT_TOPIC, Produced.with(String.serde, Value.serde));