Search code examples
scalaapache-sparkavrospark-avro

Avro backward compatibility doesn't work as expected


I have two Avro schema V1 and V2 which are read in spark as below:

import org.apache.spark.sql.avro.functions._

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/V1.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"avroFields")

V1 with two fields "one" and "two"

{
  "name": "test",
  "namespace": "foo.bar",
  "type": "record",
  "fields": [
    {
      "name": "one",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "two",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

V2 with new field: "three"

{
  "name": "test",
  "namespace": "foo.bar",
  "type": "record",
  "fields": [
    {
      "name": "one",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "two",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "three",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Scenario: The writer uses V1 to write and Reader uses V2 to decode the avro record. My expectation is to see field 3 populated with the default which is null. But I run into the below exception from my spark job.

Am I missing something here? My understanding is that avro supports backward compatibility.

Exception in thread "main" java.io.EOFException
  at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
  at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
  at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
  at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
  at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
  at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
  at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)

Solution

  • You alway have to decode Avro with the exact schema is is written in. This is because Avro uses untagged data to be more compact and requires the writers schema to be present at decoding time.

    So, when you are reading with your V2 schema it looks for field three (or maybe the null marker for this field) and throws an error.

    What you can do is map decoded data (decoded with the writers schema) to a reader schema, Java has an API for that: SpecificDatumReader(Schema writer, Schema reader).

    Protocol Buffers or Thrift do what you want, the are tagged formats. Avro expects the schema to travel with the data, for example in an Avro file.