Search code examples
apache-flinkflink-streaming

Apache Flink - How to implement custom Deserializer implementing DeserializationSchema


I'm working with Flink and I'm using the Kafka Connector. The messages that I'm receiving from flink is a list of comma separated items. "'a','b','c',1,0.1 ....'12:01:00.000'" One of them contain the event time, I would like to use this event time for the per-partition watermarking (in the kafka source), then use this Event Time for session windowing. My case is a bit different from usual because from what i have understood people usually use "kafka Timestamps" and SimpleStringSchema(). On my case instead I have to write my own deserializer that implement DeserializationSchema and return a Tuple or Pojo. So basically substitute the SimpleStringSchema() with my own function. Flink offer out of the box some deserializers but I really don't understnd how i can create a custom deserialization logic.

Checking the flink website i have found this:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html

enter image description here

I have been given an example (Thanks David!), but still i don't get how to implement mine.

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

I would really need an example of how I can do it for a list. The one indicated above is for JSON so is giving me the theory, the concept but i got stuck there.


Solution

  • You should introduce the POJO like

    public class Event implements Serializable {
        ...
        private Long timestamp;
    }
    

    and implement the simple deserializer similar to the one from the link - you can parse the line either manually splitting by comma the message string, or you an out-of-box csv readers, like opencsv, to parse the line into your POJO:

    public class EventDeserializationSchema implements DeserializationSchema<Event> {
    
        private static final long serialVersionUID = 1L;
    
        @Override
        public ClickEvent deserialize(byte[] message) throws IOException {
            String line = new String(message, StandardCharsets.UTF_8);
            String[] parts = line.split(",");
            
            Event event = new Event();
            // TODO: parts to event here
            return event;
        }
    
        @Override
        public boolean isEndOfStream(Event nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<Event> getProducedType() {
            return TypeInformation.of(Event.class);
        }
    }