I'm learning Kafka Streams and I'm getting an error, I have tried a few things but noting works
Input : value_1, value_2, value_3 ...............
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1:9092";
String consumer_group = "firstGroup1";
String topic = "test1";
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, consumer_group);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder builder = new StreamsBuilder();
// input topic
KStream<String, String> inputtopic = builder.stream(topic);
// filter the value
KStream<String, String> filtered_stream = inputtopic.filter((k, v) -> ((v.equalsIgnoreCase("value_5")) || (v.equalsIgnoreCase("value_7")) || (v.equalsIgnoreCase("value_9"))));
filtered_stream.foreach((k, v) -> System.out.println(v));
// output topic set
filtered_stream.to("prime_value");
// build a topology
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
// start our stream system
kafkaStreams.start();
}
Error message
1800 [main] INFO org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [firstGroup1-d1244e8e-dbc1-4139-8876-ca75cb89c609-StreamThread-1-consumer] Cooperative rebalancing enabled now
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
1852 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1641673582569
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.ConsumerConfig.addDeserializerToConfig(Ljava/util/Map;Lorg/apache/kafka/common/serialization/Deserializer;Lorg/apache/kafka/common/serialization/Deserializer;)Ljava/util/Map;
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:537)
at org.apache.kafka.streams.processor.internals.StreamThread$InternalConsumerConfig.<init>(StreamThread.java:535)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:406)
at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:897)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:887)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:783)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:693)
at com.example.kafkastreams.main(kafkastreams.java:47)
Line no 47: { KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);}
It looks like you have mismatched versions of kafka-clients
and kafka-streams
- they must be the same version.
When using Spring Boot; you should not add versions for the kafka dependencies; Boot will bring in the correct versions of both libraries.