I created a sink connector from kafka to mysql. After transform in sink connector's config and deleting some columns I get this error whereas whithout transform it works:
(STRUCT) type doesn't have a mapping to the SQL database column type
{
"name": "mysql-conf-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"topics": "mysql.cars.prices",
"transforms": "dropPrefix,unwrap",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "mysql.cars.prices",
"transforms.dropPrefix.replacement": "prices",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.target.type": "Timestamp",
"transforms.timestamp.field": "date_time",
"transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"connection.url": "jdbc:mysql://localhost:3306/product",
"connection.user": "kafka",
"connection.password": "123456",
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "id, brand",
"insert.mode": "insert",
"auto.create": "true",
"auto.evolve": "true",
"batch.size": 50000
}
}
You have put "transforms"
key more than once in your JSON, which isn't valid.
Try with one entry
"transforms": "unwrap,ReplaceField,dropPrefix",
You are getting the error because you have overrode the value, and unwrap
, specifically, is no longer called, so you have nested Structs.
The blacklist property got renamed to exclude
, by the way - https://docs.confluent.io/platform/current/connect/transforms/replacefield.html#properties