Search code examples
javaapache-flinkavro

How to read AvroFile into Tuple Class with Java in Flink


I'm Trying to read an Avro file and perform some operations on it, everything works fine but the aggregation functions, when I use them it get the below exception :

aggregating on field positions is only possible on tuple data types

then I change my class to implement Tuple4 (as I have 4 fields) but then when I want to collect the results get AvroTypeException Unknown Type : T0

Here are my data and job classes :

public class Nation{

public Integer N_NATIONKEY;
public String N_NAME;
public Integer N_REGIONKEY;
public String N_COMMENT;

public Integer getN_NATIONKEY() {
    return N_NATIONKEY;
}

public void setN_NATIONKEY(Integer n_NATIONKEY) {
    N_NATIONKEY = n_NATIONKEY;
}

public String getN_NAME() {
    return N_NAME;
}

public void setN_NAME(String n_NAME) {
    N_NAME = n_NAME;
}

public Integer getN_REGIONKEY() {
    return N_REGIONKEY;
}

public void setN_REGIONKEY(Integer n_REGIONKEY) {
    N_REGIONKEY = n_REGIONKEY;
}

public String getN_COMMENT() {
    return N_COMMENT;
}

public void setN_COMMENT(String n_COMMENT) {
    N_COMMENT = n_COMMENT;
}
public Nation() {
}


public static void main(String[] args) throws Exception {
    Configuration parameters = new Configuration();

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


    Path path2 = new Path("/Users/violet/Desktop/nation.avro");

    AvroInputFormat<Nation> format = new AvroInputFormat<Nation>(path2,Nation.class);
    format.configure(parameters);
    DataSet<Nation> nation = env.createInput(format);
    nation.aggregate(Aggregations.SUM,0);


    JobExecutionResult res = env.execute();
}

and here's the tuple class and the same code for the job as above:

public class NationTuple extends Tuple4<Integer,String,Integer,String> {

Integer N_NATIONKEY(){ return this.f0;}
String N_NAME(){return this.f1;}
Integer N_REGIONKEY(){ return this.f2;}
String N_COMMENT(){ return this.f3;}

}

I tried with this class and got the TypeException (Used NationTuple everywhere instead of Nation)


Solution

  • I don't think having your class implementing Tuple4 is right way to go. Instead you should add to your topology a MapFunction that converts your NationTuple to Tuple4.

    static Tuple4<Integer, String, Integer, String> toTuple(Nation nation) {
      return Tuple4.of(nation.N_NATIONKEY, ...);
    }
    

    And then in your topology call:

    inputData.map(p -> toTuple(p)).returns(new TypeHint<Tuple4<Integer, String, Integer, String>(){});
    

    The only subtle part is that you need to provide a type hint so flink can figure out what kind of tuple your function returns.

    Another solution is to use field names instead of tuple field indices when doing your aggregation. For example:

    groupBy("N_NATIONKEY", "N_REGIONKEY")
    

    This is all explained here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#specifying-keys