I have created a debezium connector to a docker MySQL container. I tried to set a filter for messages:
{
"name": "my_connector",
"config": {
"name": "my_connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
...
"include.schema.changes": "true",
"transforms": "filter, unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.source.table == 'table-name' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))"
}
}
In http://localhost:8070/connectors/my_connector/status
I see this:
{ "connector": { "state": "RUNNING", "worker_id": "172.21.0.13:8083" }, "name": "my_connector", "tasks": [ { "id": 0, "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.DebeziumException: Error while evaluating expression 'value.source.table == 'subscription_contract' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))' for record 'SourceRecord{sourcePartition={server=subscription_contracts_db }, sourceOffset={file=binlog.000006, pos=19704, snapshot=true}} ConnectRecord{topic='subscription_contracts_db', kafkaPartition=0, key=Struct{databaseName=subscription-contracts}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.2.0.Final,connector=mysql,name=subscription_contracts_db,ts_ms=0,snapshot=true,db=subscription-contracts,table=subscription_contract,server_id=0,file=binlog.000006,pos=19704,row=0},databaseName=subscription-contracts,ddl=DROP TABLE IF EXISTS
subscription-contracts
.subscription_contract
}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}'\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116)\n\tat io.debezium.transforms.Filter.doApply(Filter.java:33)\n\tat io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 11 more\nCaused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320)\n\tat org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71)\n\tat java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89)\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107)\n\t... 16 more\nCaused by: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)\n\tat org.apache.kafka.connect.data.Struct.get(Struct.java:74)\n\tat jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)\n\tat groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)\n\tat org.codehaus.groovy.runtime.metaclass.MethodMetaProperty$GetMethodMetaProperty.getProperty(MethodMetaProperty.java:62)\n\tat org.codehaus.groovy.runtime.callsite.GetEffectivePojoPropertySite.getProperty(GetEffectivePojoPropertySite.java:63)\n\tat org.codehaus.groovy.runtime.callsite.AbstractCallSite.callGetProperty(AbstractCallSite.java:329)\n\tat Script9.run(Script9.groovy:1)\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)\n\t... 19 more\n", "worker_id": "172.21.0.13:8083" } ], "type": "source" }
As OneCricketeer pointed out, the basic issue here is:
Caused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat
But I am not sure what is wrong with using it, since it seems like it is supposed to be a valid field - here.
After some investigation, I've seemed to find the answer, hope it'd help someone else;
In my connector configurations I had this configuration:
"include.schema.changes": "true"
Which caused my connector to include also logs about schema changes in the DB table.
I have a docker container of a migrator initiating the DB container by running some flyway migrations, one of them is the DROP TABLE
in my exception above.
Since schema change message has no reason to contain an op
field it just doesn't (as shown in the example here).
When the filter tries to fetch the field it doesn't find it and throws an exception.
Changing the configuration to false
solved the issue.