We are creating a dataflow pipeline, Which will get a JSON and write to a parquet file. we are using the org.apache.beam.sdk.io.parquet package to write a file. ParquetIO.Sink allows you to write a PCollection of GenericRecord into a Parquet file (from here https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html). Now we to know how to convert JsonObject (with complex structure) to GenericRecord.
We tried to generate GenericRecord by using GenericRecordBuilder (org.apache.avro.generic.GenericRecordBuilder). and we are using JsonObject from com.google.gson.JsonObject But we got stuck how to convert generate GenericRecord for JsonArray with Ojects
Our sample Json
{
"event_name": "added_to_cart",
"event_id": "AMKL9877",
"attributes": [
{"key": "total", "value": "8982", "type": "double"},
{"key": "order_id", "value": "AKM1011", "type": "string"}
]
}
Our schema
{
"type":"record",
"name":"event",
"fields":[
{
"name":"event_name",
"type":"string"
},
{
"name":"event_id",
"type":"string"
},
{
"name":"attributes",
"type":{
"type":"array",
"items":{
"type":"record",
"name":"attribute_data",
"fields":[
{
"name":"key",
"type":"string"
},
{
"name":"value",
"type":"string"
},
{
"name":"type",
"type":"string"
}
]
}
}
}
]
}
Our code used to convert JsonObject to GenericRecord using GenericRecordBuilder
JsonObject event = element.getAsJsonObject();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA);
for (Schema.Field field:SCHEMA.getFields()) {
System.out.println(field);
String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY);
System.out.println(at_header);
if(at_header != null && at_header.equals(Boolean.TRUE.toString())){
recordBuilder.set(field.name(), null);
}else{
JsonElement keyElement = event.get(field.name());
recordBuilder.set(field.name(), getElementAsType(field.schema(), keyElement));
}
}
return recordBuilder.build();
Object getElementAsType(Schema schema, JsonElement element) {
if(element == null || element.isJsonNull())
return null;
switch(schema.getType()){
case BOOLEAN:
return element.getAsBoolean();
case DOUBLE:
return element.getAsDouble();
case FLOAT:
return element.getAsFloat();
case INT:
return element.getAsInt();
case LONG:
return element.getAsLong();
case NULL:
return null;
case ARRAY:
???
case MAP:
???
default:
return element.getAsString();
}
We need to know how to build GenericRecord for complex type like an array of objects, map from a JSON. Thanks in Advance.
Here i found my answer from this page https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/package-summary.html
A generic representation for Avro data.
This representation is best for applications which deal with dynamic data, whose schemas are not known until runtime.
Avro schemas are mapped to Java types as follows: