Search code examples
apache-flinkflink-streaming

is JSONDeserializationSchema() deprecated in Flink?


I am new to Flink and doing something very similar to the below link.

Cannot see message while sinking kafka stream and cannot see print message in flink 1.2

I am also trying to add JSONDeserializationSchema() as a deserializer for my Kafka input JSON message which is without a key.

But I found JSONDeserializationSchema() is not present.

Please let me know if I am doing anything wrong.

enter image description here


Solution

  • JSONDeserializationSchema was removed in Flink 1.8, after having been deprecated earlier.

    The recommended approach is to write a deserializer that implements DeserializationSchema<T>. Here's an example, which I've copied from the Flink Operations Playground:

    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    
    import java.io.IOException;
    
    /**
     * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
     *
     */
    public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
    
        private static final long serialVersionUID = 1L;
    
        private static final ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public ClickEvent deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, ClickEvent.class);
        }
    
        @Override
        public boolean isEndOfStream(ClickEvent nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<ClickEvent> getProducedType() {
            return TypeInformation.of(ClickEvent.class);
        }
    }
    

    For a Kafka producer you'll want to implement KafkaSerializationSchema<T>, and you'll find examples of that in that same project.