Search code examples
mongodbapache-kafkaapache-kafka-connectconfluent-schema-registrydebezium

Debezium MongoDB Connector Error: org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler


I am trying to deploy a new Debezium Connector for MongoDB with Transforms. The configuration looks like this:

{"name": "mongo_source_connector_autostate",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "tasks.max":1,
    "initial.sync.max.threads":4,
    "mongodb.hosts": "rs0/FE0VMC1980:27017", 
    "mongodb.name": "mongo", 
    "collection.whitelist": "DASMongoDB.*_AutoState",
    "transforms": "unwrap",
    "transforms.unwrap.type" : "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "transforms.sanitize.field.names" : true
    }}

However the connector fails with the following error:

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.avro.SchemaParseException: Illegal initial character: 10019_AutoState
        at org.apache.avro.Schema.validateName(Schema.java:1528)
        at org.apache.avro.Schema.access$400(Schema.java:87)
        at org.apache.avro.Schema$Name.<init>(Schema.java:675)
        at org.apache.avro.Schema.createRecord(Schema.java:212)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:893)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more

I have started the connector in distributed mode with the following configuration:

...
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
...

Note: I have another connector without any transforms. It runs just fine.

I would like to get some help regarding this. Thanks in advance.


Solution

  • One of your fields seems to be violating the Avro naming rules. In your case it seems to be this one:

    The name portion of a fullname, record field names, and enum symbols must:

    • start with [A-Za-z_]

    But 10019_AutoState violates the rule as it starts with numerical values. You can change it to something like AutoState10019


    You can view the full list with all the record field naming constraints here.