Search code examples
javaapache-kafkaapache-kafka-streams

How can we group based on a key from kafka streams and create global k table with string as key and list of objects as values


I have a table with below values

SCHEME RULEORDER REGEX
VS 1 0*[2368A]
MC 1 0*[23]
MC 2 0*
ZS 1 9*
MC 3 22*

Every row entry from this table is sent as a Kstream:


`ruleStream
      .peek((k, v) -> logReceivedRecord("RULE", k, Optional.ofNullable(v).map(RuleCdc::getUPDATETS).orElse(null)))
      .filter((k, v) -> Objects.nonNull(v), Named.as("rule-nonnull-filter"))
      .map((k, v) -> new KeyValue<>(v.getSCHEME()+ "-"+ v.getRULEORDER(), mapper.mapconfig(v, initializer.initConfig(k))))
      .peek((k, v) -> logSentRecord("RuleConfig", k, getTopicNameWithRegion(TOPIC_NAME)));`


Here key is sent as combination of v.getSCHEME()+ "-"+ v.getRULEORDER() so that when Ktable is created , I would want all RULEORDER corresponding to a "Scheme". If I use just "v.getSCHEME()" as key, then latest RULEORDER will be updated for a particular SCHEME.

My PROBLEM here is : I want to collect these values that I am receiving 1 by 1 as an offset of kstream into a list and then aggregate them as such that for a particular key (SCHEME), I have a list of RULEORDERS.

I tried this but this is not working as expected:


`StreamsBuilder builder = new StreamsBuilder();
KStream<String , RuleConfig> ruleConfigKStream = 
        builder.stream(TOPIC_NAME,Consumed.with(stringSerde, ruleConfigSerde));

KGroupedStream<String , RuleConfig> groupedKStream = 
        ruleConfigKStream.groupBy((key, value) -> value.getScheme(),   
        Grouped.with(Serdes.String(),ruleConfigSerde));

KTable<String,List<RuleConfig>> ruleStore  =
        groupedKStream.aggregate(()-> new ArrayList<>(), 
        (key,value,list) -> { list.add(value) ;
        Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as (RULE_STORE)
        .withKeySerde(stringSerde).withValueSerde(listSerde);
        return list;
        });

builder.build();

ReadOnlyKeyValueStore<String, List<RuleConfig>> ruleKVStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(RULE_STORE, 
        QueryableStoreTypes.keyValueStore()));


ruleKVStore.get("MC"); --> **This is not giving expected result**`


Solution

  • I think your call to Materialized.as is just slightly misplaced:

    groupedKStream.aggregate(ArrayList::new,
        (key,value,list) -> { list.add(value) ; return list; },
        Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as (RULE_STORE)
            .withKeySerde(stringSerde).withValueSerde(listSerde)
        );
    

    Other than that, your approach should work.