I have a small app to count the number of colors using Apache Kafka -
public class FavouriteColor {
private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";
private static final String APPLICATION_ID = "favourite-colour-java";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);
KStream<String, String> usersAndColours = textLines
.filter((key, value) -> value.contains(","))
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
.mapValues(value -> value.split(",")[1].toLowerCase())
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);
KTable<String, Long> favouriteColours = usersAndColoursTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Named.as("CountsByColours"));
favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
System.out.println(streams);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
The topics are created and producers/ consumers are started using the terminal:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic favourite-colour-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
I provided the following inputs into the terminal:
stephane,blue
john,green
stephane,red
alice,red
I received the error in the consumer terminal:
stephane Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
at scala.Option.map(Option.scala:242)
at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
What's the issue here? I did brief research and find similar questions asked by other people, but, the solutions seem not to work for me.
You defined the value deserializer to be that for Long, but it looks like your data is a String instead.