Search code examples
gsongoogle-cloud-dataflowavroapache-beamparquet

How to convert my JsonObject (com.google.gson.JsonObject) to GenericRecord (org.apache.avro.generic.GenericRecord) type


We are creating a dataflow pipeline, Which will get a JSON and write to a parquet file. we are using the org.apache.beam.sdk.io.parquet package to write a file. ParquetIO.Sink allows you to write a PCollection of GenericRecord into a Parquet file (from here https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html). Now we to know how to convert JsonObject (with complex structure) to GenericRecord.

We tried to generate GenericRecord by using GenericRecordBuilder (org.apache.avro.generic.GenericRecordBuilder). and we are using JsonObject from com.google.gson.JsonObject But we got stuck how to convert generate GenericRecord for JsonArray with Ojects

Our sample Json

{
    "event_name": "added_to_cart",
    "event_id": "AMKL9877",
    "attributes": [
        {"key": "total", "value": "8982", "type": "double"},
        {"key": "order_id", "value": "AKM1011", "type": "string"}
    ]
}

Our schema

{  
    "type":"record",
    "name":"event",
    "fields":[  
        {  
        "name":"event_name",
        "type":"string"
        },
        {  
        "name":"event_id",
        "type":"string"
        },
        {  
        "name":"attributes",
        "type":{  
            "type":"array",
            "items":{  
            "type":"record",
            "name":"attribute_data",
            "fields":[  
                {  
                "name":"key",
                "type":"string"
                },
                {  
                "name":"value",
                "type":"string"
                },
                {  
                "name":"type",
                "type":"string"
                }
            ]
            }
        }
        }
    ]
}

Our code used to convert JsonObject to GenericRecord using GenericRecordBuilder

JsonObject event = element.getAsJsonObject();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA);

for (Schema.Field field:SCHEMA.getFields()) {
    System.out.println(field);
    String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY);
    System.out.println(at_header);
    if(at_header != null && at_header.equals(Boolean.TRUE.toString())){
        recordBuilder.set(field.name(), null);
    }else{
        JsonElement keyElement = event.get(field.name());
        recordBuilder.set(field.name(), getElementAsType(field.schema(), keyElement));
    }
}

return recordBuilder.build();


Object getElementAsType(Schema schema, JsonElement element) { 
    if(element == null || element.isJsonNull())
        return null;
    switch(schema.getType()){
    case BOOLEAN:
        return element.getAsBoolean();
    case DOUBLE:
        return element.getAsDouble();
    case FLOAT:
        return element.getAsFloat();
    case INT:
        return element.getAsInt();
    case LONG:
        return element.getAsLong();
    case NULL:
        return null;
    case ARRAY:
        ???
    case MAP:
        ???            
    default:
        return element.getAsString();
}

We need to know how to build GenericRecord for complex type like an array of objects, map from a JSON. Thanks in Advance.


Solution

  • Here i found my answer from this page https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/package-summary.html

    A generic representation for Avro data.

    This representation is best for applications which deal with dynamic data, whose schemas are not known until runtime.

    Avro schemas are mapped to Java types as follows:

    • Schema records are implemented as GenericRecord.
    • Schema enums are implemented as GenericEnumSymbol.
    • Schema arrays are implemented as Collection.
    • Schema maps are implemented as Map.
    • Schema fixed are implemented as GenericFixed.
    • Schema strings are implemented as CharSequence.
    • Schema bytes are implemented as ByteBuffer.
    • Schema ints are implemented as Integer.
    • Schema longs are implemented as Long.
    • Schema floats are implemented as Float.
    • Schema doubles are implemented as Double.
    • Schema booleans are implemented as Boolean.