Search code examples
javajsonavro

Convert Avro file to JSON with reader schema


I would like to deserialize Avro data on the command line with a reader schema that is different from the writer schema. I can specify writer schema on serialization, but not during deserialization.

record.json (data file):

{"test1": 1, "test2": 2}

writer.avsc (writer schema):

{
    "type": "record",
    "name": "pouac",
    "fields": [
        {
            "name": "test1",
            "type": "int"
        },
        {
            "name": "test2",
            "type": "int"
        }
    ]
}

reader.avsc (reader schema):

{
    "type": "record",
    "name": "pouac",
    "fields": [{
        "name": "test2",
         "type": "int",
         "aliases": ["test1"]
    }]
}

Serializing data:

$ java -jar avro-tools-1.8.2.jar fromjson --schema-file writer.avsc record.json > record.avro

For deserializing data, I tried the following:

$ java -jar avro-tools-1.8.2.jar tojson --schema-file reader.avsc record.avro
Exception in thread "main" joptsimple.UnrecognizedOptionException: 'schema-file' is not a recognized option
...

I'm looking primarily for a command line instruction because I'm not so confortable writing Java code, but I'd be happy with Java code to compile myself. Actually, what I'm interested in is the exact deserialization result. (the more fundamental issue at stake is described in this conversation on a fastavro PR that I opened to implement aliases)


Solution

  • The avro-tools tojson target is only meant as a dump tool for translating a binary encoded Avro file to JSON. The schema always accompanies the records in the Avro file as outlined in the link below. As a result it cannot be overridden by avro-tools.

    http://avro.apache.org/docs/1.8.2/#compare

    I am not aware of a stand-alone tool that can be used to achieve what you want. I think you'll need to do some programming to achieve the desired results. Avro has many supported languages including Python but the capabilities across languages is not uniform. Java is in my experience the most advanced. As an example Python lacks the ability to specify a reader schema on the DataFileReader which would help achieve what you want:

    https://github.com/apache/avro/blob/master/lang/py/src/avro/datafile.py#L224

    The closest you can get in Python is the following;

    import avro.schema as avsc
    import avro.datafile as avdf
    import avro.io as avio
    
    reader_schema = avsc.parse(open("reader.avsc", "rb").read())
    
    # need ability to inject reader schema as 3rd arg
    with avdf.DataFileReader(open("record.avro", "rb"), avio.DatumReader()) as reader:
        for record in reader:
            print record
    

    In terms of the schemas and the data you've outlined. The expected behaviour should be undefined and therefore emit an error.

    This behaviour can be verified with the following Java code;

    package ca.junctionbox.soavro;
    
    import org.apache.avro.Schema;
    import org.apache.avro.SchemaValidationException;
    import org.apache.avro.SchemaValidationStrategy;
    import org.apache.avro.SchemaValidator;
    import org.apache.avro.SchemaValidatorBuilder;
    
    import java.util.ArrayList;
    
    public class Main {
        public static final String V1 = "{\n" +
                "    \"type\": \"record\",\n" +
                "    \"name\": \"pouac\",\n" +
                "    \"fields\": [\n" +
                "        {\n" +
                "            \"name\": \"test1\",\n" +
                "            \"type\": \"int\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"name\": \"test2\",\n" +
                "            \"type\": \"int\"\n" +
                "        }\n" +
                "    ]\n" +
                "}";
    
        public static final String V2 = "{\n" +
                "    \"type\": \"record\",\n" +
                "    \"name\": \"pouac\",\n" +
                "    \"fields\": [{\n" +
                "        \"name\": \"test2\",\n" +
                "         \"type\": \"int\",\n" +
                "         \"aliases\": [\"test1\"]\n" +
                "    }]\n" +
                "}";
    
        public static void main(final String[] args) {
            final SchemaValidator sv = new SchemaValidatorBuilder()
                    .canBeReadStrategy()
                    .validateAll();
            final Schema sv1 = new Schema.Parser().parse(V1);
            final Schema sv2 = new Schema.Parser().parse(V2);
            final ArrayList<Schema> existing = new ArrayList<>();
            existing.add(sv1);
    
            try {
                sv.validate(sv2, existing);
                System.out.println("Good to go!");
            } catch (SchemaValidationException e) {
                e.printStackTrace();
            }
        }
    }
    

    This yields the following output:

    org.apache.avro.SchemaValidationException: Unable to read schema: 
    {
      "type" : "record",
      "name" : "pouac",
      "fields" : [ {
        "name" : "test2",
        "type" : "int",
        "aliases" : [ "test1" ]
      } ]
    }
    using schema:
    {
      "type" : "record",
      "name" : "pouac",
      "fields" : [ {
        "name" : "test1",
        "type" : "int"
      }, {
        "name" : "test2",
        "type" : "int"
      } ]
    }
        at org.apache.avro.ValidateMutualRead.canRead(ValidateMutualRead.java:70)
        at org.apache.avro.ValidateCanBeRead.validate(ValidateCanBeRead.java:39)
        at org.apache.avro.ValidateAll.validate(ValidateAll.java:51)
        at ca.junctionbox.soavro.Main.main(Main.java:47)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
    

    Aliases are typically used for backwards compatibility in schema evolution allowing mappings from disparate/legacy keys to a common key name. Given your writer schema doesn't treat the test1 and test2 fields as "optional" through the use of unions I can't see what scenario you'd want this transformation. If you want to "drop" the test1 field then it can be achieved by excluding it from the v2 schema specification. Any reader that can apply a reader scheme would then ignore test1 using the v2 schema definition.

    To illustrate what I mean by evolution;

    v1 schema

    {
      "type": "record",
      "name": "pouac",
      "fields": [
        {
            "name": "test1",
            "type": "int"
        }]
    }
    

    v2 schema

    {
      "type": "record",
      "name": "pouac",
      "fields": [
        {
            "name": "test2",
            "type": "int",
            "aliases": ["test1"]
        }]
    }
    

    You could have terabytes of data in the v1 format and introduce the v2 format which renames the test1 field to test2. The alias would allow you to perform map-reduce jobs, Hive queries, etc on both v1 and v2 data without proactively rewriting all the old v1 data first. Note this assumes there is no change in type and the semantic meaning of the fields.