Search code examples
apache-kafkaapache-kafka-streams

How to update related messages in kafka topic if incoming message has reference to them only inside value?


I need to merge the payload if the reference number(refNo) is the same in different messages. My limitation is that I can only use a KTable and if the key is an even number I don't need to merge the payload. Additionally, the order of incoming messages should not change the result.

For example, if we have an empty topic and incoming messages are:

1: { key: "1", value: {refNo:1, payload:{data1}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} } // this one should be not effected and left how it is

Expected result:

1: { key: "1", value: {refNo:1, payload:{data1, data2}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} }

The only way I can think of to do this is to use two times .groupBy and join with the original topic everything again.

  1. First change the key to refNo, save the key to the value itself, and join the payload during aggregation.
  2. Secondly .groupBy revert key to the initial state.
  3. The last step joins everything to the original topic because I lost one message during grouping by.

I'm pretty sure there's an easier way to do this. What is the most optimized and elegant way to solve this issue?

Edit: Its downstream and there is output topic, original is not edited.


Solution

  • At the moment I'm going with this solution. It works, but I have no idea how it will perform, or can it be more optimized, or if there is better way to solve my issue.

    KStream<String, Value> even = inputTopicStream.filter((key, value) -> value.isEven()));
    
    inputTopicStream.toTable(Materialized.with(String.serdes, Value.serde))
       .groupBy(
          (key, value) -> KeyValue.pair(new Key(value.getRefNo(), addKeyToValue(key, value)),
          Grouped.with("aggregation-internal", String.serdes, Value.serde))
       .aggregate(
          Value::new,
          (key, value, agg) -> mergePayload(key, value, agg), // ensure that key is uneven after merge
          (key, value, agg) -> handleSplit(key, value, agg))
       .toStream()
          .selectKey((key, value) -> new Key(value.getKey())) // restore original key
          .merge(even) // need to merge even key stream, because they was lost during aggregation
          .to(OUTPUT_TOPIC, Produced.with(String.serde, Value.serde));