I have a Java Spring Kafka application that sends Objects of type Book to the Kafka topic. Then I'm trying to use Kafka streams to map each message to have Book's author as its key. Then I'm trying to add them to a KTable that holds keys and the number of messages that have this key. The Table is then sent over to an output kafka topic.
Book Model:
@Data
public class Book {
private UUID id;
private String name;
private String author;
private LocalTime date;
}
Stream structure:
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String,Book> stream = streamsBuilder.stream("input_topic", Consumed.with(Serdes.String(), new BookSerde()));
stream = stream.selectKey((key,value)->value.getAuthor());
KTable<String ,Long> keyWithCount = stream.groupBy((key, value) -> value.getAuthor()).count();
keyWithCount.toStream().to("output_topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), config);
kafkaStreams.start();
After running the app, the output shows each unique key but random emotes instead of counting.
A peculiar thing I found after messing with .peek()
. I printed the key and value right before sending to the topic and it shows that it works the way it should! The problem however is that the kafka console consumer of the topic doesn't show numbers in the count but rather some random symbols such as diamond, heart, smiley face etc.
An example would be that after sending a Book with "A" as author to my producer to parent topic my streaming app counts it as the first book, prints "A 1" but in the kafka-console-consumer it looks like "A ☺".
My findings with peek() should've been the giveaway. The issue layed not in the code but in the configuration of kafka-console-consumer. The default value deserializer couldn't deserialize a long value. To fix that i had to add --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"
while initializing the consumer.