Search code examples
springapache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Multiple Output Bindings (AKA Branching) In Spring Cloud Stream With Different Key/Value


I know Kafka Streams allow data to be distributed to multiple topics based on specified predicates, and the Kafka Streams binder supports this using both the @StreamListener and the functional binding approach.

...
// return type KStream<?, WordCount>[]

Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

return input.
    ... 
    branch(isEnglish, isFrench, isSpanish);

I'm wondering how to transform either the key or value of one of the branches before returning the data. Say I'd like one of the branches to have a different key type from the others.

Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo =  (k, v) -> v.important.equals("2");

KStream<Key, Value>[] branches = input.branch(isOne, isTwo);

KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);

I thought of creating a new KStream<?, Value>[] array with both streams but couldn't because of the generic array creation error.

I know this is possible as could be seen from the documentation excerpt below, different key/value serdes could be specified for each branch's producer.

spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

Thanks for helping.


Solution

  • One option is to create a side topic. Then, every record that is different from the WordCount you send to that side topic. Records that are still a WordCount you keep on the topics of the branches.

    I create this example of code based on the samples of Spring cloud streams with KStream. It is not working because but the idea works. I have one simular example which consumes different Order objects and send wrong Orders to an error topic.

    return input -> {
                KStream<?, WordCount> intermediateStream = input
                        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                        .groupBy((key, value) -> value)
                        .windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
                        .count(Materialized.as("WordCounts-1"))
                        .toStream()
                        .map((key, value) -> new KeyValue<>(null,
                                new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    
                // here we return to the SIDE_TOPIC records with JsonSerde
                intermediateStream
                        .filter((k, v) -> `create another filter`)
                        .map((k, v) -> `transform only this stream`)
                        .to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));
    
                // here we keep using the branch serializer.
                intermediateStream.branch(isEnglish, isFrench, isSpanish);
            }
    

    This use case is a common approach when you consume data that comes with error or null values that you want to send to a side topic, i.e.: an error topic. Then you can still save those events to analyze them in the future.