I am using a kafka consumer with the below properties:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer
A KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer) is pushing JSON records into a Topic and this Consumer is reading from it, Functionality-wise its working fine, but problem comes up when my producer pushes a non-JSON message (eg: empty message).
In this case, The consumer is going down and it will not consume until that empty message is cleared(I've reset the offset of the consumer group to latest).
Is there any way to handle this, maybe using some property or something like that
The Consumer API has no deserialization exception handling properties like Kafka Streams does
You'll need to create your own Deserializer that wraps the json one and handles any errors
You may find the SafeDeserializer class in azkarra-commons to be useful