Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams: SerializationException: Size of data received by LongDeserializer is not 8


I have a small app to count the number of colors using Apache Kafka -

public class FavouriteColor {


    private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
    private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
    private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";

    private static final String APPLICATION_ID = "favourite-colour-java";


    public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);

        KStream<String, String> usersAndColours = textLines
            .filter((key, value) -> value.contains(","))
            .selectKey((key, value) -> value.split(",")[0].toLowerCase())
            .mapValues(value -> value.split(",")[1].toLowerCase())
            .filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));

        usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
        KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);

        KTable<String, Long> favouriteColours = usersAndColoursTable
            .groupBy((user, colour) -> new KeyValue<>(colour, colour))
            .count(Named.as("CountsByColours"));

        favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.cleanUp();
        streams.start();

        System.out.println(streams);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The topics are created and producers/ consumers are started using the terminal:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact



kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic favourite-colour-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer



kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input

I provided the following inputs into the terminal:

stephane,blue
john,green
stephane,red
alice,red

I received the error in the consumer terminal:

stephane    Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
    at scala.Option.map(Option.scala:242)
    at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
    at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

What's the issue here? I did brief research and find similar questions asked by other people, but, the solutions seem not to work for me.


Solution

  • You defined the value deserializer to be that for Long, but it looks like your data is a String instead.