I know Kafka Streams allow data to be distributed to multiple topics based on specified predicates, and the Kafka Streams binder supports this using both the @StreamListener
and the functional binding
approach.
...
// return type KStream<?, WordCount>[]
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input.
...
branch(isEnglish, isFrench, isSpanish);
I'm wondering how to transform either the key or value of one of the branches before returning the data. Say I'd like one of the branches to have a different key type from the others.
Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo = (k, v) -> v.important.equals("2");
KStream<Key, Value>[] branches = input.branch(isOne, isTwo);
KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);
I thought of creating a new KStream<?, Value>[]
array with both streams but couldn't because of the generic array creation error.
I know this is possible as could be seen from the documentation excerpt below, different key/value serdes could be specified for each branch's producer
.
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
Thanks for helping.
One option is to create a side topic. Then, every record that is different from the WordCount
you send to that side topic. Records that are still a WordCount
you keep on the topics of the branches.
I create this example of code based on the samples of Spring cloud streams with KStream. It is not working because but the idea works. I have one simular example which consumes different Order
objects and send wrong Orders
to an error topic.
return input -> {
KStream<?, WordCount> intermediateStream = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
// here we return to the SIDE_TOPIC records with JsonSerde
intermediateStream
.filter((k, v) -> `create another filter`)
.map((k, v) -> `transform only this stream`)
.to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));
// here we keep using the branch serializer.
intermediateStream.branch(isEnglish, isFrench, isSpanish);
}
This use case is a common approach when you consume data that comes with error or null values that you want to send to a side topic, i.e.: an error topic. Then you can still save those events to analyze them in the future.