I am creating a small utility on JAVA flink API to learn the functionalities. I am trying to read csv file and just print it and I have developed a POJO class for the structure of the data. When I executed the code, I dont see the right values.(Integers values are replaced with zeros and null values for String. How do I map the datatype for the attributes
My Main Class:
package org.karthick.flinkLab;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import javax.xml.crypto.Data;
public class CSVFileRead {
public static void main(String[] args) throws Exception {
System.out.println("--CSV File Reader using Flink's Data Set API--");
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
My Pojo class (DataModel.class)
package org.karthick.flinkLab;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple12;
import java.io.Serializable;
import java.util.Date;
public class DataModel<T extends Tuple>
extends Tuple12<Integer,String,Date,Integer,String,String,String,String,String,String,Date,String>
implements Serializable
public Integer id;
public String government_id;
public Date diagnosed_date;
public Integer age;
public String detected_city;
public String detected_district;
public String detected_state;
public String nationality;
public String current_status;
public Date status_change_date;
public String notes;
public DataModel() {};
public String getNotes() {
return notes;
public Date getStatus_change_date() {
return status_change_date;
public String getCurrent_status() {
return current_status;
public String getNationality() {
return nationality;
public String getDetected_state() {
return detected_state;
public String getDetected_district() {
return detected_district;
public String getDetected_city() {
return detected_city;
public String gender ;
public Date getDiagnosed_date() {
return diagnosed_date;
public String getGender() {
return gender;
public Integer getAge() {
return age;
public Integer getId() {
return id;
public void setId(Integer id) {
this.id = id;
public String getGovernment_id() {
return government_id;
public void setGovernment_id(String government_id) {
this.government_id = government_id;
When I executed the main method, I dont see the proper values. Sample result
(0,,Tue May 19 16:50:38 IST 2020,0,,,,,,,Tue May 19 16:50:38 IST 2020,)
where as I expect something like
(2777,AP,Tue May 19 16:50:38 IST 2020,0,A,B,C,D,E,F,Tue May 19 16:50:38 IST 2020,G)
What could be missing here?
You are missing the column mapping from CSV to POJO. Adding the mapping will work. The mapping of the column names must follow the following two rules:
You can define the mapping as follows:
DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
.pojoType(DataModel.class, "id", "age",.........);
It should have thrown error but it hasn't. It could be a bug