Search code examples
apache-kafka-streams

Spring Kafka Streams Grouping Without Common Key


In the past I have used the Spring Kafka Streams to do grouping and aggregating and it worked pretty smoothly, but now have a possible use-case that I'm not sure how to get working as needed.

Use case I'm trying to solve for would be incoming messages that are linked via related but different ID fields. Where the relation could be the following

  • id1 = id1
  • id1 = id2
  • id2 = id2
  • id2 = id1

For example:

{ 
  id1: 1222
  id2: 1333
},
{
  id1: 1666
  id2: 1777
},
{
  id1: 1222
  id2: 1444
},
{
  id1: 1555
  id2: 1444
},
{
  id1: 1888
  id2: 1666
}

Would result into 2 different groups as such

{
  id1: [1666, 1888]
  id2: [1777, 1666]
},
{
  id1: [1222, 1222, 1555]
  id2: [1333, 1444, 1444]
}

The issue I'm having is there doesn't seem to be a way to guarantee that the linked ID's will go to the same Kafka partition if we have more than 1 (usually have a minimum of 3) which would break the stream grouping functionality. Since the incoming stream of messages are relatively small in size, I am going to see what kind of throughput we can achieve as an essentially single threaded app, but figured I'd ask the question if there are other better ways to do this and also maintain a partitioning greater than 1.


Solution

  • I was able to get this working with a combination of using normal groupBy and aggregation with a processor after the windowing that then checks the current stateStore and updates if it includes any of the tangential values.

    Edit: this is still only using a single Kafka partition as more than 1 was not viable.

    Base Kafka stream topology

    messageStream
        .groupByKey()
        .windowedBy(...))
        .aggregate(initializer, aggregator, merger, Materialized.with(...))
        .toStream()
        .process(transformerSupplier, SUPPRESSED_WINDOWED_TRANSACTION_KV_STORE)
        .to("xxxx", Produced.with(...));
    

    With the transformerSupplier implemented with this process method

     @Override
    public void process(Record record) {
    Windowed<KafkaStreamKey> outputKey = (Windowed<KafkaStreamKey>) record.key();
    KafkaStreamOutput outputValue = (KafkaStreamOutput) record.value();
    AtomicInteger counter = new AtomicInteger(0);
    
    if (log.isDebugEnabled()) {
      log.debug(
          String.format("Inserting to local keystore%nFull key [%s]%nValue [%s]", record.key(),
              record.value()));
    }
    if (record != null && record.key() != null && record.value() != null) {
      outputValue.setWindowedEndTime(outputKey.window().endTime());
    
      kvStore.all().forEachRemaining(stringTestKafkaStreamOutputKeyValue -> {
        List<String> tangentialKeys = new ArrayList<>();
        tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId2());
        tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId1());
    
        if (stringTestKafkaStreamOutputKeyValue.value.getId1().stream()
            .anyMatch(tangentialKeys::contains) ||
            stringTestKafkaStreamOutputKeyValue.value.getId2().stream()
                .anyMatch(tangentialKeys::contains)) {
          //Combine the kvStore from incoming record and existing
          List<String> newId1List = new ArrayList<>();
          newId1List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId1());
          newId1List.addAll(((TestKafkaStreamOutput) record.value()).getId1());
    
          List<String> newId2List = new ArrayList<>();
          newId2List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId2());
          newId2List.addAll(((TestKafkaStreamOutput) record.value()).getId2());
    
          kvStore.put(stringTestKafkaStreamOutputKeyValue.key,
              TestKafkaStreamOutput.builder()
                  .Id1(newId1List.stream().sorted().distinct().toList())
                  .Id2(newId2List.stream().sorted().distinct().toList())
                  .windowedEndTime(outputValue.getWindowedEndTime())
                  .build());
          counter.getAndIncrement();
        }
      });
    
      if(counter.get() == 0) {
        //normal add to the kvStore
        kvStore.put(outputKey.key().getValue(), outputValue);
      }
    }