I'm receiving a CSV on my class and i need to get the values to crate a POJO. I don't have to open a "file.csv" into a directory, the comma separated elements are passed by Flink to the EventDeserializationSchema and this one used on the "Event Class" to process every single event.
Here an example:
IN: "'Adam','Smith',66,....'12:01:00.000'" - > OUT: pojo
To do this I'm using: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv
This is my Event Class that should do the Trick, actually at the moment isn't doing anything.
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
public class Event implements Serializable {
CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
CsvSchema schema = CsvSchema.emptySchema().withHeader();
CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
ObjectMapper mapper = new CsvMapper();
mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
return Pojo
}
This is my Pojo class:
public class Pojo {
public String firstName;
public String lastName;
private int age;
public String time;
public Pojo(String firstName, String lastName, int age, String time) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.time =time;
}
}
Any help to get the class to return a Pojo would be really appreciated.
This is an example with JSON: https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java
To make it work, you need to have a default constructor and getter/setters for fields. I don't understand what you are going to do in Event
and why there is also a Pojo
, but assuming you want to deserialize the incoming string into the Event
, something like this should work:
Event
Pojo class:public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
deserialize()
implementedpublic class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}