Search code examples
javaapache-sparkavrospark-avro

org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING


I am doing simple json to Avro Record conversion, But I am getting this issue, I tried lot of ways, I applied more than 15 solutions from stackoverflow and online.

My File look like this

{
    "namespace": "test",
    "type": "record",
    "name": "root",
    "doc": "This stream contains raw data.",
    "fields": [
        {
            "name": "aaa",
            "doc": "You should not edit this portion.",
            "type": {
                "type": "record",
                "name": "EnterpriseEventEnvelopeRecord",
                "fields": [
                    {
                        "name": "eventId",
                        "type": "string",
                        "default": "",
                        "doc": "Unique Identifier."
                    },
                    {
                        "name": "eventAction",
                        "type": [
                            "null",
                            {
                                "type": "enum",
                                "name": "actionTypes",
                                "symbols": [
                                    "Updated",
                                    "Created",
                                    "Requested",
                                    "Deleted",
                                    "Verified",
                                    "Received",
                                    "Completed",
                                    "Failed",
                                    "Abandoned"
                                ]
                            }
                        ],
                        "default": null,
                        "doc": "A verb indicating what happened."
                    }
                ]
            }
        }
    ]
}

My Input json:

{"aaa" : {"eventId" : "omar", "eventAction" : "Requested"}}

My Class:

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;

import java.io.*;

import static java.nio.file.Files.readAllBytes;
import static java.nio.file.Paths.get;

public class FeedbackEvent {
    public static void main(String args[]) throws Exception{
        String jsonFile = "d:/aaa.txt";
        String scemaFile = "d:/aaa.avsc";
        Schema schema = new Schema.Parser().parse(new File(scemaFile));
        String json = new String(readAllBytes(get(jsonFile)));
        jsonToAvro(json,schema);
        System.out.println("Done....");
    }

    public static byte[] jsonToAvro(String json, Schema schema) throws IOException {
        InputStream input = null;
        DataFileWriter<GenericRecord> writer = null;
        Encoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
            input = new ByteArrayInputStream(json.getBytes());
            output = new ByteArrayOutputStream();
            DataInputStream din = new DataInputStream(input);
            writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
            writer.create(schema, output);
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.append(datum);
            }
            writer.flush();
            System.out.println(output);
            return output.toByteArray();
        } finally {
            try { input.close(); } catch (Exception e) { }
        }
    }
}

Solution

  • The union branch to be used should be specified, if it is not null. Please refer Unions and Json Encoding

    In your case, actionTypes. So the json should look like

    {
      "aaa": {
        "eventId": "omar",
        "eventAction": {
            "test.actionTypes": "Requested"
        }
      }
    }
    

    You may notice we used namespace as well along with union branch; Clear explanation is available in this Stackoverflow thread.

    Hope this helps.