Search code examples
jsonapache-kafkaavro

Avro Schema when no of fields in json are not always same


I am working with confluent kafka platform. In order to produce messages to a topic I have given a Avro schema of 16 fields. Now incoming records will have data from these 16 fields only but not all of them. Using DEFAULT value for each field is not helping as pointed out in this discussion. What is the solution for this problem?

SAMPLE CODE:

var KafkaRest = require('kafka-rest');
var AvroSchemais = new KafkaRest.AvroSchema({
    "name": "Mydata",
    "type": "record",
    "fields": [
        { "name": "id", "type": "string" },
        {"name" : "data", 
                     "type" : {
                         "type" : "array",
                         "items" :{
                            "name":"manyfields",
                            "type":"record",
                            "fields" : [
                            {"name" : "ip", 
                             "type" : "string", 
                             "default" : "NONE"},

                            {"name" : "iptime", 
                             "type" : "string", 
                             "default" : "NONE"},

                            {"name" : "mcc", 
                             "type" : "string", 
                             "default" : "NONE"},

                            {"name" : "mnc", 
                             "type" : "string", 
                             "default" : "NONE"},

                              {"name" : "cid", 
                             "type" : "string", 
                             "default" : "NONE"},

                             {"name" : "lac", 
                             "type" : "string", 
                             "default" : "NONE"}
                    ]}}}]});
topic.produce(AvroSchema, {'id':'abcd','data': [{"ip":"12.12.12.12","lac":"1234"}]},function(err, res){
    if (err){console.log(err);}
    else{console.log(res);}
});

error:

message: 'Conversion of JSON to Avro failed: Failed to convert JSON to Avro: Expected field name not found: iptime'

Any help appreciated!!


Solution

  • From reading the Avro specification, it seems defaults should let you do what you want:

    default: A default value for this field, used when reading instances that lack this field

    Maybe the deserializer you are using didn't implement this for JSON encoding. In the meantime, you can use avsc to work around this by automatically populating the missing fields:

    var KafkaRest = require('kafka-rest'),
        avro = avsc;
    
    var attrs = {
      "name": "Mydata",
      "type": "record",
      "fields": [ /* ... */ ]
    };
    var AvroSchema = new KafkaRest.AvroSchema(attrs);
    var type = avro.parse(attrs);
    
    var withoutDefaults = {'id': 'abcd','data': [{"ip":"12.12.12.12", "lac":"1234"}]};
    var withDefaults = type.clone(withoutDefaults); // All defaults are present here.
    
    topic.produce(AvroSchema, withDefaults, function (err, res){
      if (err) { console.log(err); }
      else { console.log(res); }
    });