Search code examples
apache-kafkaapache-kafka-connectconfluent-platformjsonpathdebezium

Kafka connector config error: filter.condition: Invalid json path defined


I'm trying to use Confluent's Filter SMT with Debezium example unwrap-smt.

I added the following configs to source connector (Debezium MySQL) config:

    "transforms": "route,csFilter",
    ...
    ...
    "transforms.csFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.csFilter.filter.condition": "$.payload.after.source == 2",
    "transforms.csFilter.filter.type": "exclude",
    "transforms.csFilter.missing.or.null.behavior": "fail"

Since this Filter SMT is provided by Confluent, I downloaded the jar file and copied (connect-transforms, connect-utils, json-path) jar files to path-to-kafka/connect/debezium-connector-mysql directory.

When I tried to register Debezium MySQL source connector,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 
localhost:8083/connectors/ -d @source_connector_config.json

I got this error:

{"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\n
Invalid value $.payload.after.source == 2 for configuration filter.condition: Invalid json path defined. 
Please refer to https://github.com/json-path/JsonPath README for correct use of json path.\n
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

I checked the JSON path expression with the examples provided in this guide. It seemed okay.

Can you please point me to the right directioin? What am I missing? Thanks.


Solution

  • Please try to use this condition: $.payload.after[?(@.source == 2)]