Search code examples
mysqljsonapache-kafkaapache-kafka-connectdebezium

Caused by: io.debezium.text.ParsingException: extraneous input 'ASC' expecting


I am running source kafka connector but unfortunately i am getting below error:

{"name":"supplier-central","connector":{"state":"RUNNING","worker_id":"192.168.208.4:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"192.168.208.4:8083","trace":"org.apache.kafka.connect.errors.ConnectException: extraneous input 'ASC' expecting {<EOF>, '--'}\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: io.debezium.text.ParsingException: extraneous input 'ASC' expecting {<EOF>, '--'}\n\tat io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:40)\n\tat org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)\n\tat org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportUnwantedToken(DefaultErrorStrategy.java:349)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.singleTokenDeletion(DefaultErrorStrategy.java:513)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:238)\n\tat io.debezium.ddl.parser.mysql.generated.MySqlParser.root(MySqlParser.java:817)\n\tat io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:68)\n\tat io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:41)\n\tat io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:80)\n\tat io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:307)\n\tat io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)\n\tat io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)\n\t... 5 more\n"}],"type":"source"}** 

and in debezium logs i am getting below error:

2019-08-23 05:02:40,101 INFO   MySQL|data_lake|task  [Consumer clientId=supplier-central-dbhistory, groupId=supplier-central-dbhistory] Member supplier-central-dbhistory-41cab001-1c64-4ab2-8869-58dca22b783c sending LeaveGroup request to coordinator kafka:9092 (id: 2147483646 rack: null)   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
Aug 23, 2019 5:02:41 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to 52.76.148.206:3306 at mysql-bin.010785/66551561 (sid:425, cid:315812)
2019-08-23 05:02:41,200 INFO   ||  WorkerSourceTask{id=supplier-central-0} Source task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-08-23 05:02:41,841 INFO   ||  WorkerSourceTask{id=supplier-central-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-08-23 05:02:41,841 INFO   ||  WorkerSourceTask{id=supplier-central-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-08-23 05:02:41,841 ERROR  ||  WorkerSourceTask{id=supplier-central-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
2019-08-23 05:02:41,841 ERROR  ||  WorkerSourceTask{id=supplier-central-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
2019-08-23 05:02:41,859 INFO   MySQL|data_lake|task  [Producer clientId=supplier-central-dbhistory] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.   [org.apache.kafka.clients.producer.KafkaProducer]

I am not using schema registry and avro. source db is mysql.

My other source connector works fine. I am not able to identify error. Source db is third party db may be someone change anything in db but as per my understanding kafka connector also make changes in binlog for that. So may be this is not issue.

Can anyone tell me problem and solution for this?

connector configuration:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:38083/connectors/ \
    -d '{
    "name": "supplier-central",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "ankitg",
        "snapshot.mode": "initial",
        "include.schema.changes": "true",
        "database.password": "abc@123",
        "database.server.id": "425",
        "database.server.name": "data_lake",
        "database.whitelist": "supplier",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "history.supplier_central",
        "table.whitelist": "supplier_central.suppliers,supplier_central.supplier_business_types,supplier_central.supplier_address,supplier_central.supplier_banks,supplier_central.supplier_profile,supplier_central.supplier_documents",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}'

Solution

  • I got this error when i used different database name and different table name in configuration. check your configuration database.whitelist and table.whitelist are matching with each other or configured correctly.