Search code examples
spring-bootapache-kafkaapache-kafka-streamsspring-kafka

Kafka Stream: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig


I'm learning Kafka Streams and I'm getting an error, I have tried a few things but noting works

Input : value_1, value_2, value_3 ...............

public static void main(String[] args) throws InterruptedException {
            String host = "127.0.0.1:9092";
            String consumer_group = "firstGroup1";
            String topic = "test1";
    
    //        create properties
            Properties properties = new Properties();
            properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, host);
            properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, consumer_group);
            properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
            properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
    
    
    
            //        create a topology
            StreamsBuilder builder = new StreamsBuilder();
    
    
            //        input topic
            KStream<String, String> inputtopic = builder.stream(topic);
    
            //        filter the value
            KStream<String, String> filtered_stream = inputtopic.filter((k, v) -> ((v.equalsIgnoreCase("value_5")) || (v.equalsIgnoreCase("value_7")) || (v.equalsIgnoreCase("value_9"))));
            filtered_stream.foreach((k, v) -> System.out.println(v));
            //        output topic set
            filtered_stream.to("prime_value");
    
    
    //        build a topology
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
    
    
    //        start our stream system
            kafkaStreams.start();
    
    
        }

Error message

    1800 [main] INFO org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [firstGroup1-d1244e8e-dbc1-4139-8876-ca75cb89c609-StreamThread-1-consumer] Cooperative rebalancing enabled now
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1641673582569
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig(Ljava/util/Map;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;)Ljava/util/Map;
    at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:537)
    at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:535)
    at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:527)
    at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:406)
    at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:897)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:887)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:783)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:693)
    at com.example.kafkastreams.main(kafkastreams.java:47)

Line no 47: { KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);}


Solution

  • It looks like you have mismatched versions of kafka-clients and kafka-streams - they must be the same version.

    When using Spring Boot; you should not add versions for the kafka dependencies; Boot will bring in the correct versions of both libraries.