I have loaded an Avro file in a Flink Dataset:
AvroInputFormat<GenericRecord> test = new AvroInputFormat<GenericRecord>(
new Path("PathToAvroFile")
, GenericRecord.class);
DataSet<GenericRecord> DS = env.createInput(test);
usersDS.print();
and here are the results of printing the DS:
{"N_NATIONKEY": 14, "N_NAME": "KENYA", "N_REGIONKEY": 0, "N_COMMENT": " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"}
{"N_NATIONKEY": 15, "N_NAME": "MOROCCO", "N_REGIONKEY": 0, "N_COMMENT": "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"}
{"N_NATIONKEY": 16, "N_NAME": "MOZAMBIQUE", "N_REGIONKEY": 0, "N_COMMENT": "s. ironic, unusual asymptotes wake blithely r"}
{"N_NATIONKEY": 17, "N_NAME": "PERU", "N_REGIONKEY": 1, "N_COMMENT": "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"}
{"N_NATIONKEY": 18, "N_NAME": "CHINA", "N_REGIONKEY": 2, "N_COMMENT": "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"}
{"N_NATIONKEY": 19, "N_NAME": "ROMANIA", "N_REGIONKEY": 3, "N_COMMENT": "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"}
{"N_NATIONKEY": 20, "N_NAME": "SAUDI ARABIA", "N_REGIONKEY": 4, "N_COMMENT": "ts. silent requests haggle. closely express packages sleep across the blithely"}
Now I want to create a table from DS Dataset with the exactly the same schema of Avro file, I mean columns should be N_NATIONKEY, N_NAME, N_REGIONKEY, and N_COMMENT.
I know using the line:
tableEnv.registerDataSet("tbTest", DS, "field1, field2, ...");
I can create a table and set the columns, but I want the columns to be inferred automatically from data. Is it possible? In addition, I tried
tableEnv.registerDataSet("tbTest", DS);
but it creates a table with the schema:
root
|-- f0: GenericType<org.apache.avro.generic.GenericRecord>
GenericRecord
is a black-box for the Table & SQL API runtime as the number of fields and their datatype is undefined. I would recommend to use a Avro-generated class which extends SpecificRecord
. Those specific types are also recognized by Flink's type system and you can properly address individual fields with proper data type.
Alternatively, you could implement a custom UDF that extracts fields with a proper datatype getAvroInt(f0, "myField")
, getAvroString(f0, "myField")
etc.
Some pseudo code for this:
class AvroStringFieldExtract extends ScalarFunction {
public String eval(GenericRecord r, String fieldName) {
return r.get(fieldName).toString();
}
}
tableEnv.registerFunction("getAvroFieldString", new AvroStringFieldExtract())