Search code examples
javaapache-flinkflink-streamingjackson-dataformat-xml

Flink How to use - FasterXML / jackson-dataformats-text - To convert CSV TO POJO


I'm receiving a CSV on my class and i need to get the values to crate a POJO. I don't have to open a "file.csv" into a directory, the comma separated elements are passed by Flink to the EventDeserializationSchema and this one used on the "Event Class" to process every single event.

Here an example:

IN: "'Adam','Smith',66,....'12:01:00.000'" - > OUT: pojo

To do this I'm using: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

This is my Event Class that should do the Trick, actually at the moment isn't doing anything.

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();
    
    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
    
    return Pojo
}

This is my Pojo class:

public class Pojo {
    
        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName, String lastName, int age, String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;
            
        }

}

Any help to get the class to return a Pojo would be really appreciated.

This is an example with JSON: https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

ClickEvenClass https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaEvent.java#L27


Solution

  • To make it work, you need to have a default constructor and getter/setters for fields. I don't understand what you are going to do in Event and why there is also a Pojo, but assuming you want to deserialize the incoming string into the Event, something like this should work:

    1. Event Pojo class:
    public class Event implements Serializable {
        public String firstName;
        public String lastName;
        private int age;
        public String time;
    
        public Event() {
        }
    
        public String getFirstName() {
            return firstName;
        }
    
        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    
        public String getLastName() {
            return lastName;
        }
    
        public void setLastName(String lastName) {
            this.lastName = lastName;
        }
    
        public int getAge() {
            return age;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    
        public String getTime() {
            return time;
        }
    
        public void setTime(String time) {
            this.time = time;
        }
    }
    
    1. EventDeserializationSchema from this question with deserialize() implemented
    public class EventDeserializationSchema implements DeserializationSchema<Event> {
    
        private static final long serialVersionUID = 1L;
    
        private static final CsvSchema schema = CsvSchema.builder()
                .addColumn("firstName")
                .addColumn("lastName")
                .addColumn("age", CsvSchema.ColumnType.NUMBER)
                .addColumn("time")
                .build();
    
        private static final ObjectMapper mapper = new CsvMapper();
    
        @Override
        public Event deserialize(byte[] message) throws IOException {
            return mapper.readerFor(Event.class).with(schema).readValue(message);
        }
    
        @Override
        public boolean isEndOfStream(Event nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<Event> getProducedType() {
            return TypeInformation.of(Event.class);
        }
    }