I have a table with below values
SCHEME | RULEORDER | REGEX |
---|---|---|
VS | 1 | 0*[2368A] |
MC | 1 | 0*[23] |
MC | 2 | 0* |
ZS | 1 | 9* |
MC | 3 | 22* |
Every row entry from this table is sent as a Kstream:
`ruleStream
.peek((k, v) -> logReceivedRecord("RULE", k, Optional.ofNullable(v).map(RuleCdc::getUPDATETS).orElse(null)))
.filter((k, v) -> Objects.nonNull(v), Named.as("rule-nonnull-filter"))
.map((k, v) -> new KeyValue<>(v.getSCHEME()+ "-"+ v.getRULEORDER(), mapper.mapconfig(v, initializer.initConfig(k))))
.peek((k, v) -> logSentRecord("RuleConfig", k, getTopicNameWithRegion(TOPIC_NAME)));`
Here key is sent as combination of v.getSCHEME()+ "-"+ v.getRULEORDER() so that when Ktable is created , I would want all RULEORDER corresponding to a "Scheme". If I use just "v.getSCHEME()" as key, then latest RULEORDER will be updated for a particular SCHEME.
My PROBLEM here is : I want to collect these values that I am receiving 1 by 1 as an offset of kstream into a list and then aggregate them as such that for a particular key (SCHEME), I have a list of RULEORDERS.
I tried this but this is not working as expected:
`StreamsBuilder builder = new StreamsBuilder();
KStream<String , RuleConfig> ruleConfigKStream =
builder.stream(TOPIC_NAME,Consumed.with(stringSerde, ruleConfigSerde));
KGroupedStream<String , RuleConfig> groupedKStream =
ruleConfigKStream.groupBy((key, value) -> value.getScheme(),
Grouped.with(Serdes.String(),ruleConfigSerde));
KTable<String,List<RuleConfig>> ruleStore =
groupedKStream.aggregate(()-> new ArrayList<>(),
(key,value,list) -> { list.add(value) ;
Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as (RULE_STORE)
.withKeySerde(stringSerde).withValueSerde(listSerde);
return list;
});
builder.build();
ReadOnlyKeyValueStore<String, List<RuleConfig>> ruleKVStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(RULE_STORE,
QueryableStoreTypes.keyValueStore()));
ruleKVStore.get("MC"); --> **This is not giving expected result**`
I think your call to Materialized.as
is just slightly misplaced:
groupedKStream.aggregate(ArrayList::new,
(key,value,list) -> { list.add(value) ; return list; },
Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as (RULE_STORE)
.withKeySerde(stringSerde).withValueSerde(listSerde)
);
Other than that, your approach should work.