Search code examples
apache-kafkaapache-kafka-streamsspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Kafka Streams: Define multiple Kafka Streams using Spring Cloud Stream for each set of topics


I am trying to do a simple POC with Kafka Streams. However I am getting exception while starting the application. I am using Spring-Kafka, Kafka-Streams 2.5.1 with Spring boot 2.3.5 Kafka stream configuration

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processCCC() {
        return input -> input.peek((key, value) -> log
                .info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    /*
    @Bean
    public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @Bean
    public Topology kafkaStreamTopology() {
        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
        return streamsBuilder.build();
    } */
}

application.yaml configured is like below. The idea is that I have 3 input and 3 output topics. The component takes input from input topic and gives output to outputtopic.

spring:
  application.name: consumerapp-1
  cloud:
    function:
      definition: processAAA;processBBB;processCCC
    stream:
      kafka.binder: 
          brokers: 127.0.0.1:9092
          autoCreateTopics: true
          auto-add-partitions: true
      kafka.streams.binder:
          configuration: 
            commit.interval.ms: 1000
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        processAAA-in-0:
          destination: aaaInputTopic
        processAAA-out-0:
          destination: aaaOutputTopic
        processBBB-in-0:
          destination: bbbInputTopic
        processBBB-out-0:
          destination: bbbOutputTopic
        processCCC-in-0:
          destination: cccInputTopic
        processCCC-out-0:
          destination: cccOutputTopic

Exception thrown is

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

Can anyone help me with Kafka Streams Spring-Kafka code samples for processing with multiple input and output topics.

Updates: 21-Jan-2021

After removing all kafkaStreams and kafkaStreamsTopology beans configuration iam getting below message in an infinite loop. The messages consumption is still not working. I have checked the subscription in application.yaml with the @Bean Function definitions. they all look ok to me but still I get this cross wiring error. I have replaced the application.properties with application.yaml above

    [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

Solution

  • I have managed to solve the problem. I am writing this for the benefit of others. If you want to include multiple streams in your single app jar then the key is in defining multiple application Ids that is one per each of your streams. I knew this all along but I was not aware on how to define it. Finally the answer is something I have managed to dig out after reading the SCSt documentation. Below is how the application.yaml can be defined. application.yaml is like below

    spring:
      application.name: kafkaMultiStreamConsumer
      cloud:
        function:
          definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
        stream:
          kafka: 
            binder:
              brokers: 127.0.0.1:9092
              min-partition-count: 3
              replication-factor: 2
              transaction:
                transaction-id-prefix: transaction-id-2000
              autoCreateTopics: true
              auto-add-partitions: true
            streams:
              binder:
                functions: 
                // needed for functional
                  processBBB: 
                    application-id: SampleBBBapplication
                  processAAA: 
                    application-id: SampleAAAapplication
                  processCCC: 
                    application-id: SampleCCCapplication
                configuration: 
                  commit.interval.ms: 1000            
                  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde        
          bindings:
          // Below is for Imperative Style programming using 
          // the annotation namely @StreamListener, @SendTo in .java class
            inputAAA:
              destination: aaaInputTopic
            outputAAA:
              destination: aaaOutputTopic
            inputBBB:
              destination: bbbInputTopic
            outputBBB:
              destination: bbbOutputTopic
            inputCCC:
              destination: cccInputTopic
            outputCCC:
              destination: cccOutputTopic
         // Functional Style programming using Function<KStream...> use either one of them
         // as both are not required. If you use both its ok but only one of them works
         // from what i have seen @StreamListener is triggered always.
         // Below is from functional style
            processAAA-in-0:
              destination: aaaInputTopic
              group: processAAA-group
            processAAA-out-0:
              destination: aaaOutputTopic
              group: processAAA-group
            processBBB-in-0:
              destination: bbbInputTopic
              group: processBBB-group
            processBBB-out-0:
              destination: bbbOutputTopic
              group: processBBB-group
            processCCC-in-0:
              destination: cccInputTopic
              group: processCCC-group
            processCCC-out-0:
              destination: cccOutputTopic
              group: processCCC-group
    

    Once above is defined we now need to define individual java classes where the Stream processing logic is implemented. Your Java class can be something like below. Create similarly for other 2 or N streams as per your requirement. One example is like below : AAASampleStreamTask.java

    @Component
    @EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
    public class AAASampleStreamTask {
        private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);
    
        @StreamListener(AAASampleChannel.INPUT)
        @SendTo(AAASampleChannel.OUTPUT)
        public KStream<String, String> processAAA(KStream<String, String> input) {
            input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
           ...
           // do other business logic
           ...
            return input;
        }
        
        /**
         * Use above or below. Below style is latest startting from ScSt 3.0 if iam not 
         * wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have 
         * both then above gets priority as per my observation
         */     
        /* 
        @Bean
        public Function<KStream<String, String>, KStream<String, String>> processAAA() {
            return input -> input.peek((key, value) -> log.info(
                    "Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
           ...
         // do other business logic
           ...
        }
        */
    }
    

    The Channel is required if you want to go with Imperative style programming not for functional. AAASampleChannel.java

    public interface AAASampleChannel {
        String INPUT = "inputAAA";
        String OUTPUT = "outputAAA";
    
        @Input(INPUT)
        KStream<String, String> inputAAA();
    
        @Output(OUTPUT)
        KStream<String, String> outputAAA();
    }