Search code examples
javaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Kafka streams GlobalKTable throws Deserialization exception on Tombstone - null value- records


I have a Spring cloud stream based Kafka Streams application where I'm binding a Global KTable to a Compact topic. When I push a Tombstone record to the topic (Non-empty key with null value) - my Kafka streams application fails with Deserialization exception. The failure is because my deserializer does not handle null records.

From the documentation, I thought GlobalKTable will not even "see" null-value records. Is it not the case? Do I need to handle null records in the deserializer?

org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
    at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)


Solution

  • Yes; you have to check for null and return null. See any of the standard deserializers.

    Unlike the KafkaConsumer 's Fetcher (which checks for null before calling), the kafka-streams calls it unconditionally. See

     at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)