Search code examples
apache-kafkaavroconfluent-platformconfluent-schema-registryconfluent-cloud

org.apache.avro.AvroTypeException: Unknown union branch EventId


I am trying to convert a json to avro using 'kafka-avro-console-producer' and publish it to kafka topic.

I am able to do that flat json/schema's but for below given schema and json I am getting "org.apache.avro.AvroTypeException: Unknown union branch EventId" error.

Any help would be appreciated.

Schema :

{
    "type": "record",
    "name": "Envelope",
    "namespace": "CoreOLTPEvents.dbo.Event",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "Value",
            "fields": [{
                "name": "EventId",
                "type": "long"
            }, {
                "name": "CameraId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "SiteId",
                "type": ["null", "long"],
                "default": null
            }],
            "connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],
        "default": null
    }, {
        "name": "after",
        "type": ["null", "Value"],
        "default": null
    }, {
        "name": "op",
        "type": "string"
    }, {
        "name": "ts_ms",
        "type": ["null", "long"],
        "default": null
    }],
    "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

And Json input is like below :

{
    "before": null,
    "after": {
        "EventId": 12,
        "CameraId": 10,
        "SiteId": 11974
    },
    "op": "C",
    "ts_ms": null
}

And in my case I cant alter schema, I can alter only json such a way that it works


Solution

  • If you are using the Avro JSON format, the input you have is slightly off. For unions, non-null values need to be specified such that the type information is listed: https://avro.apache.org/docs/current/spec.html#json_encoding

    See below for an example which I think should work.

    {
        "before": null,
        "after": {
            "CoreOLTPEvents.dbo.Event.Value": {
                "EventId": 12,
                "CameraId": {
                    "long": 10
                },
                "SiteId": {
                    "long": 11974
                }
            }
        },
        "op": "C",
        "ts_ms": null
    }