Search code examples
pythonavrofastavro

fastavro - Convert json file into avro file


A bit new to avro & python.

I am trying to do a simple conversion to avro using the fastavro library, as the speed of the native apache avro library is just a bit too slow.

I want to:

1.Take a json file 2. Convert the data to avro.

My problem is that it seems like my json isn't in the correct 'record' format to be converted to avro. I even tried putting my json into a string var and making it looks similar to the syntax they have on the site @ https://fastavro.readthedocs.io/en/latest/writer.html:

    {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
    {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
    {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},

Here is my code:

from fastavro import json_writer, parse_schema, writer
import json

key = "test.json"
schemaFileName = "test_schema.avsc"
with open(r'C:/Path/to/file' + schemaFileName) as sc:
    w = json.load(sc)

schema = parse_schema(w)

with open(r'C:/Path/to/file/' + key) as js:
    x=json.load(js)

with open('C:/Path/to/file/output.avro', 'wb') as out:
    writer(out, schema,x, codec='deflate')

Here is what I get as output:

  File "avropython.py", line 26, in <module>
    writer(out, schema,x, codec='deflate')
  File "fastavro\_write.pyx", line 608, in fastavro._write.writer
ValueError: "records" argument should be an iterable, not dict

My json file and schema, rspectively:

  "joined": false,
  "toward": {
    "selection": "dress",
    "near": true,
    "shoulder": false,
    "fine": -109780201.3804388,
    "pet": {
      "stood": "saddle",
      "live": false,
      "leather": false,
      "tube": false,
      "over": false,
      "impossible": true
    },
    "higher": false
  },
  "wear": true,
  "asleep": "door",
  "connected": true,
  "stairs": -1195512399.5000324
}
{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "joined",
      "type": "boolean"
    },
    {
      "name": "toward",
      "type": {
        "name": "toward",
        "type": "record",
        "fields": [
          {
            "name": "selection",
            "type": "string"
          },
          {
            "name": "near",
            "type": "boolean"
          },
          {
            "name": "shoulder",
            "type": "boolean"
          },
          {
            "name": "fine",
            "type": "float"
          },
          {
            "name": "pet",
            "type": {
              "name": "pet",
              "type": "record",
              "fields": [
                {
                  "name": "stood",
                  "type": "string"
                },
                {
                  "name": "live",
                  "type": "boolean"
                },
                {
                  "name": "leather",
                  "type": "boolean"
                },
                {
                  "name": "tube",
                  "type": "boolean"
                },
                {
                  "name": "over",
                  "type": "boolean"
                },
                {
                  "name": "impossible",
                  "type": "boolean"
                }
              ]
            }
          },
          {
            "name": "higher",
            "type": "boolean"
          }
        ]
      }
    },
    {
      "name": "wear",
      "type": "boolean"
    },
    {
      "name": "asleep",
      "type": "string"
    },
    {
      "name": "connected",
      "type": "boolean"
    },
    {
      "name": "stairs",
      "type": "float"
    }
  ]
}

If anyone could help me out, it would be greatly appreciated!!


Solution

  • As mentioned in the error ValueError: "records" argument should be an iterable, not dict, the problem is that when you call writer, the argument for the records needs to be an iterable. One way to solve this is to change your last line to writer(out, schema, [x], codec='deflate')

    Alternatively, there is a schemaless_writer that can be used to just write a single record: https://fastavro.readthedocs.io/en/latest/writer.html#fastavro._write_py.schemaless_writer