I put a json object to my "commits" topic. I want to consume messages using Kafka Streams, but there is an error
@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public Serde<Commit> commitSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
@Bean
public KStream<String, Commit> kStream(StreamsBuilder builder) {
KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));
KTable<String, Long> commitsCount = stream
.mapValues(Commit::getAuthorName)
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as("Counts"));
commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
}
Log says:
Exception in thread "test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.common.serialization.Serdes$WrapperSerde
Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for org.apache.kafka.common.serialization.Serdes$WrapperSerde
Caused by: java.lang.NoSuchMethodException
Your problem is the registration of the StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
. First, this is not needed in your example, as you specify the value serde in the Consumed
you use when creating the KStream. You could just leave out the default serde.
If you register a class as a default serde, Kafka Streams will at some point create an instance of that class via reflection. That calls the default (no-argument) constructor of that class. In your example a org.apache.kafka.common.serialization.Serdes$WrapperSerde class will be used from Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class))
. This class has no such constructor, which causes the exception.
If you want to register a default serde for your Commit
type, you need to wrap it into a small class:
public class CommitSerde extends WrapperSerde<Commit> {
public CommitSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
}
This class should be suitable to be registered as default value serde using props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommitSerde.class.getName());
in your example.