Search code examples
apache-flinkflink-streaming

How to handle exception while parsing JSON in Flink


I am reading data from Kafka using flink 1.4.2 and parsing them to ObjectNode using JSONDeserializationSchema. If the incoming record is not a valid JSON then my Flink job fails. I would like to skip the broken record instead of failing the job.

FlinkKafkaConsumer010<ObjectNode> kafkaConsumer =
                new FlinkKafkaConsumer010<>(TOPIC, new JSONDeserializationSchema(), consumerProperties);
DataStream<ObjectNode> messageStream = env.addSource(kafkaConsumer);
messageStream.print();

I am getting the following exception if the data in Kafka is not a valid JSON.

Job execution switched to status FAILING.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'This': was expecting ('true', 'false' or 'null')
 at [Source: [B@4f522623; line: 1, column: 6]
Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Solution

  • As suggested by @twalthr, I implemented my own DeserializationSchema by copying JSONDeserializationSchema and added exception handling.

    import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    import java.io.IOException;
    
    public class CustomJSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
    
        private ObjectMapper mapper;
    
        @Override
        public ObjectNode deserialize(byte[] message) throws IOException {
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
    
            ObjectNode objectNode;
            try {
                objectNode = mapper.readValue(message, ObjectNode.class);
            } catch (Exception e) {
                ObjectMapper errorMapper = new ObjectMapper();
                ObjectNode errorObjectNode = errorMapper.createObjectNode();
                errorObjectNode.put("jsonParseError", new String(message));
                objectNode = errorObjectNode;
            }
            return objectNode;
        }
    
        @Override
        public boolean isEndOfStream(ObjectNode nextElement) {
            return false;
        }
    
    }
    

    In my streaming job.

    messageStream
            .filter((event) -> {
                if(event.has("jsonParseError")) {
                    LOG.warn("JsonParseException was handled: " + event.get("jsonParseError").asText());
                    return false;
                }
                return true;
            }).print();