Search code examples
apache-kafkaavroapache-kafka-connectconfluent-schema-registrydebezium

AvroConverter fails to serialize Nested Structures using latest schema version


I'm trying to setup a debezium connector to capture the changes on a collection and stream those changes to a kafka topic.

Everything works great (inserts, updates, deletes/tombstones) until I introduced the schema registry and Avro Schemas to the configuration.

I'm trying to use a schema already defined on the topic that contains nested structures, here is an example of the schema registered on schema registry:

{
  "name": "User",
  "namespace": "com.namespace.models",
  "type": "record"
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "default": null,
      "name": "version",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "default": null,
      "name": "address",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "street",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "city",
              "type": [
                "null",
                "string"
              ]
            }
          ],
          "name": "Address",
          "namespace": "com.myothernamespace.models",
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "otherAddress",
      "type": [
        "null",
        "com.myothernamespace.models.Address"
      ]
    }
  ]
}

When the connector tries to write to the topic it fails on the AvroConverter with the following exception

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic.. 

I can see in the logs that the connector tries to produce the record using a different schema, not using the correct namespace of the nested structure (address). Also it fails on serializing the otherAddress field as it is null on the mongo collection and it outputs as a union (null, string) on the AvroConverter produced schema. It should be a union of null, Address.

Looking at the ID field it fails as well as the produced schema defines it as optional.

This causes a mismatch on the Schema registered on the Schema Registry and the schema that the producer is trying to use.

Here I share the mismatched schema that the converter is using to produce the records:

{
  "name": "User",
  "namespace": "com.namespace.models",
  "type": "record"
  "fields": [
    {
      "default": null,
      "name": "id",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "version",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "default": null,
      "name": "address",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "street",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "city",
              "type": [
                "null",
                "string"
              ]
            }
          ],
          "name": "address",        
          "namespace": "<my_topic_name>", # It's using my topic name as namespace
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "otherAddress",
      "type": [
        "null",
        "string" # not using the correct structure (Adddress)
      ]
    }
  ]
}

I was able to solve the root structure namespace with the SetSchemaMetadata SMT but AFAIK it doesn't support to be applied on nested structures.

Here is my connector config:

{
  "name": "test-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.members.auto.discover": true,
    "mongodb.connection.string": "<redacted>",
    "mongodb.user" : "<redacted>",
    "mongodb.password" : "<redacted>",
    "database.include.list": "<redacted>",
    "collection.include.list": "<redacted>",
    "mongodb.ssl.enabled": true,   
    "publish.full.document.only": true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.enhanced.avro.schema.support": "true",
    "value.converter.schema.registry.url": "<redacted>",
    "value.converter.auto.register.schemas": "false",
    "value.converter.use.latest.version": "true",
    "value.converter.connect.meta.data": "false",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "<redacted>",
    "transforms": "unwrap,ReplaceField,extractIdAsKey,SetValueSchema",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceField.blacklist": "_id",
    "transforms.extractIdAsKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractIdAsKey.field": "id",
    "transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.SetValueSchema.schema.name": "com.namespace.models.User",
    "transforms.SetValueSchema.predicate": "isTombstone",
    "transforms.SetValueSchema.negate": "true",
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "topic.prefix": "test",
    "tasks.max" : "1"
  }
}

I was expecting that by using Schema Registry with use.latest.version the AvroConverter would use the schema to produce the records. Does anyone have any ideas on what causes and how to overcome this mismatch between the schema registered on schema registry and the schema that AvroConverter uses?


Solution

  • I've solved this issue and identified the root cause. The problem lies on the conversion between BsonDocument and SourceRecord. AFAIK, with debezium it's not possible to provide a schema to be used on this conversion thus it is converted using the data available on the BsonDocument extracted from the ChangeStream (fullDocument). That's why it fails when it tries to convert the SourceRecord to Avro.

    I've ended up using MongoDBSourceConnector as it supports a way to provide the schema to the conversion between BsonDocument and SourceRecord (output.schema.value). Mongo source connector added support for avro schema namespaces on version 1.7.

    I've used version 1.9.0 that also ships with tombstone support and got the connector working using the latest schema version registered on Schema Registry and the conversion from BsonDocument until Avro works properly.

    https://www.mongodb.com/docs/kafka-connector/current/whats-new/#std-label-kafka-connector-whats-new-1.9