Search code examples
javaapache-kafka-streamsspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

How to do this topology in Spring Cloud Kafka Streams in function style?


var streamsBuilder = new StreamsBuilder();
    KStream<String, String> inputStream = streamsBuilder.stream("input_topic");

KStream<String, String> upperCaseString =
        inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);

upperCaseString.mapValues(v -> v + "postfix").to("with_postfix_topic");
upperCaseString.mapValues(v -> "prefix" + v).to("with_prefix_topic");

Topology topology = streamsBuilder.build();

I can write three function beans. The first bean will up case and write result to some topic ('upper_case_topic'). Other beans will use this result (from 'upper_case_topic') and add prefix/postfix. But how to do it without writing to middle topic ('upper_case_topic')?

enter image description here

UPDATE: It's my possible solution:

@Bean
public Consumer<KStream<String, String>> process() {
    return input -> {
        KStream<String, String> upperCaseStream =
                input.mapValues((ValueMapper<String, String>) String::toUpperCase);

        upperCaseStream.mapValues(v -> v + " 111").to("new_topic_1");

        upperCaseStream.mapValues(v -> v + " 222").to("new_topic_2");
    };
}

Solution

  • here are some options you can try.

    Option 1 - Use StreamBridge from Spring Cloud Stream

    @Bean
    public Consumer<KStream<String, String>> process() {
    
      KStream<String, String> upperCaseString =
            inputStream.mapValues((ValueMapper<String, String>) 
        String::toUpperCase);
    
      upperCaseString.foreach((key, value) -> {
                    streamBridge.send("with_postfix_topic", value + "postfix");
                    streamBridge.send("with_prefix_topic", "prefix" + value);
                });
    }
    

    One side of the above approach is that you need both Kafka and Kafka Streams binders from Spring Cloud Stream to make it work. Another issue is that you lose the end-to-end semantics that Kafka Streams natively gives as you are sending directly to Kafka topics in the business logic. Depending on your use case, this approach might be fine.

    Option 2 - Use KStream[] on the outbound

    You usually use KStream[] on the outbound with the branching feature of Kafka Streams API, but you can leverage on the output binding features that Spring Cloud Stream builds on top of the branching feature as a workaround for your use-case. Here is an idea that you can try.

    @Bean
    public Function<KStream<String, String>, KStream<String, String>[]> process() {
      return inputStream -> {
                    KStream<String, String> upperCaseString =
                            inputStream.mapValues((ValueMapper<String, String>)
                                    String::toUpperCase);
                    KStream<String, String>[] kStreams = new KStream[2];
                    kStreams[0] = upperCaseString.mapValues(v -> v + "postfix");
                    kStreams[1] = upperCaseString.mapValues(v -> v + "postfix");
                    return kStreams;
                };
    }
    

    Then you can define your destinations as below:

    spring.cloud.stream.bindings.process-in-0.destination: <input-topic-name>
    spring.cloud.stream.bindings.process-out-0.destination: <output-topic-name>
    spring.cloud.stream.bindings.process-out-1.destination: <output-topic-name>
    

    With this approach, you get the end-to-end semantics from Kafka Streams as the sending to Kafka topic is handled through Kafka Streams by calling the to method on KStream behind the scenes by the binder.

    Option 3 - Using function composition

    Another option you have is function composition in Kafka Streams binder. Keep in mind that this feature is not yet released, but the latest 3.1.x/3.2.x snapshots of the binder has this feature available. With that, you can define three simple functions as below.

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> uppercase() {
      return inputStream -> inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
    }
    
    @Bean
    public Function<KStream<String, String>, KStream<String, String>> postfixed() {
      return inputStream -> inputStream.mapValues(v -> v + "postfix");
    }
    
    @Bean
    public Function<KStream<String, String>, KStream<String, String>> prefixed() { 
      return inputStream -> inputStream.mapValues(v -> "prefix" + v);
    }
    
    

    And then you can have two function composition flows as below:

    spring.cloud.function.definition: uppercase|postfixed;uppercase|prefixed
    

    You can set input topics on each composed function bindings as below.

    spring.cloud.stream.bindings.uppercasepostfixed-in-0.destination=<input-topic>
    spring.cloud.stream.bindings.uppercaseprefixed-in-0.destination=<input-topic>
    
    

    With this function composition approach, you get the end-to-end semantics from Kafka Streams and you avoid the extra middle topic. The downside here though is that the uppercase function will be invoked twice for each incoming record.

    The above approaches will work, but please consider trade-offs before using them for your use-case.