Search code examples
python-3.xavro

Handling nested schemas of AVRO with Python3


I'm using avro1.8.2 + python3.7 (pip install avro-python3) for AVRO format handling.

Here's the sample code from AVRO website

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc", "rb").read())

writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print user
reader.close()

This code doesn't work because that the parse method was renamed to Parse, and the second parameter - which is needed to support nested schema - was removed.

So the question is how to read/write AVRO with nested schemas in python3?


Solution

  • After reading the source code of the Avro library, I figured out a way to do that. Here's the code

    import json
    
    import avro.schema
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    def create_schema():
        names = avro.schema.Names()
        load = lambda dict_value: avro.schema.SchemaFromJSONData(dict_value, names=names)
    
        transaction_schema_dict = {
            "namespace": "myavro",
            "type": "record",
            "name": "Transaction",
            "fields": [
                {"name": "name", "type": "string"},
            ]
        }
        account_schema_dict = {
            "namespace": "myavro",
            "type": "record",
            "name": "Account",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "transaction",  "type": ["null", {'type': 'array', 'items': 'Transaction'}], 'default': "null"},
            ]
        }
    
        load(transaction_schema_dict)
        return load(account_schema_dict)
    
    def write_avro_file(file_path, schema, data):
        with open(file_path, 'wb') as f, DataFileWriter(f, DatumWriter(), schema) as writer:
            writer.append(data)
    
    def print_avro_file(file_path):
        with open(file_path, 'rb') as f, DataFileReader(f, DatumReader()) as reader:
            for account in reader:
                print(account)
    
    def run():
        schema = create_schema()
        file_path = 'account.avro'
        data = {
            'name': 'my account',
            'transaction': [
                { 'name': 'my transaction 1' },
                { 'name': 'my transaction 2' },
            ]
        }
        write_avro_file(file_path, schema, data)
        print_avro_file(file_path)
    
    run()
    

    The key is to use SchemaFromJSONData function instead of Parse, and assign the same Names object to allow the schemas referencing each other. Note that the order of loading schema calls matters.