In the past I have used the Spring Kafka Streams to do grouping and aggregating and it worked pretty smoothly, but now have a possible use-case that I'm not sure how to get working as needed.
Use case I'm trying to solve for would be incoming messages that are linked via related but different ID fields. Where the relation could be the following
For example:
{
id1: 1222
id2: 1333
},
{
id1: 1666
id2: 1777
},
{
id1: 1222
id2: 1444
},
{
id1: 1555
id2: 1444
},
{
id1: 1888
id2: 1666
}
Would result into 2 different groups as such
{
id1: [1666, 1888]
id2: [1777, 1666]
},
{
id1: [1222, 1222, 1555]
id2: [1333, 1444, 1444]
}
The issue I'm having is there doesn't seem to be a way to guarantee that the linked ID's will go to the same Kafka partition if we have more than 1 (usually have a minimum of 3) which would break the stream grouping functionality. Since the incoming stream of messages are relatively small in size, I am going to see what kind of throughput we can achieve as an essentially single threaded app, but figured I'd ask the question if there are other better ways to do this and also maintain a partitioning greater than 1.
I was able to get this working with a combination of using normal groupBy and aggregation with a processor after the windowing that then checks the current stateStore and updates if it includes any of the tangential values.
Edit: this is still only using a single Kafka partition as more than 1 was not viable.
Base Kafka stream topology
messageStream
.groupByKey()
.windowedBy(...))
.aggregate(initializer, aggregator, merger, Materialized.with(...))
.toStream()
.process(transformerSupplier, SUPPRESSED_WINDOWED_TRANSACTION_KV_STORE)
.to("xxxx", Produced.with(...));
With the transformerSupplier implemented with this process method
@Override
public void process(Record record) {
Windowed<KafkaStreamKey> outputKey = (Windowed<KafkaStreamKey>) record.key();
KafkaStreamOutput outputValue = (KafkaStreamOutput) record.value();
AtomicInteger counter = new AtomicInteger(0);
if (log.isDebugEnabled()) {
log.debug(
String.format("Inserting to local keystore%nFull key [%s]%nValue [%s]", record.key(),
record.value()));
}
if (record != null && record.key() != null && record.value() != null) {
outputValue.setWindowedEndTime(outputKey.window().endTime());
kvStore.all().forEachRemaining(stringTestKafkaStreamOutputKeyValue -> {
List<String> tangentialKeys = new ArrayList<>();
tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId2());
tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId1());
if (stringTestKafkaStreamOutputKeyValue.value.getId1().stream()
.anyMatch(tangentialKeys::contains) ||
stringTestKafkaStreamOutputKeyValue.value.getId2().stream()
.anyMatch(tangentialKeys::contains)) {
//Combine the kvStore from incoming record and existing
List<String> newId1List = new ArrayList<>();
newId1List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId1());
newId1List.addAll(((TestKafkaStreamOutput) record.value()).getId1());
List<String> newId2List = new ArrayList<>();
newId2List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId2());
newId2List.addAll(((TestKafkaStreamOutput) record.value()).getId2());
kvStore.put(stringTestKafkaStreamOutputKeyValue.key,
TestKafkaStreamOutput.builder()
.Id1(newId1List.stream().sorted().distinct().toList())
.Id2(newId2List.stream().sorted().distinct().toList())
.windowedEndTime(outputValue.getWindowedEndTime())
.build());
counter.getAndIncrement();
}
});
if(counter.get() == 0) {
//normal add to the kvStore
kvStore.put(outputKey.key().getValue(), outputValue);
}
}