Search code examples
scalaapache-sparkavroparquet

Avro: convert UNION schema to RECORD schema


I have auto-generated Avro schema for simple class hierarchy:

trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T

It looks like this:

 [{
  "name": "org.example.schema.raw.A",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "value",
    "type": "int"
  }]
}, {
  "name": "org.example.schema.raw.B",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "history",
    "type": {
      "type": "array",
      "items": "string"
    }
  }]
}]

This schema works well for reading data from JSON into GenericRecord using plain Avro API. Next thing I try to achieve is storing all such GenericRecord objects into single parquet file using AvroParquetWriter:

val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()

This code fails at the first line with

java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)

No wonder, AvroSchemaConverter contains following lines:

if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
      throw new IllegalArgumentException("Avro schema must be a record.");
}

And my schema type is UNION. Any ideas/help in mapping (merging) this UNION schema into RECORD schema or any other suggestions are extremely appreciated.

SOLUTION

1) Read JSON from input using union scheme into GenericRecord 2) Get or create AvroParquetWriter for type:

val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)

3) Write record into file:

writer.write(record)

4) Close all writers when all data are consumed from input:

writers.values.foreach(_.close())

5) Load data from directory into Spark SQL DataFrame:

sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")

6) Data can be processed or stored as is - it's already merged by Spark:

df.write.format("parquet").save("merged.parquet")

Solution

  • To answer your question about merging : you could use the following case class Merged(name: String, value: Option[Int], history: Option[Array[String]]) and use the generated schema of that to write your data. In general, if you have a schema forward compatible for both A and B, it will write both correctly.

    Or, since, as you said, avro won't let you write all your data in the same file, maybe you can split the output by type, and write one file per type ? I know I'd probably do that in most use cases I can think of, but maybe it's not applicable for you.