Search code examples
jdbcapache-kafkaapache-kafka-connect

dropfield transform sink connector: (STRUCT) type doesn't have a mapping to the SQL database column type


I created a sink connector from kafka to mysql. After transform in sink connector's config and deleting some columns I get this error whereas whithout transform it works:

(STRUCT) type doesn't have a mapping to the SQL database column type

{
    "name": "mysql-conf-sink",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "tasks.max": "3",

      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://localhost:8081",
      "topics": "mysql.cars.prices",
      "transforms": "dropPrefix,unwrap",

      "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.dropPrefix.regex": "mysql.cars.prices",
      "transforms.dropPrefix.replacement": "prices", 
  
      "transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.timestamp.target.type": "Timestamp",
      "transforms.timestamp.field": "date_time",
      "transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss",
  

      "errors.tolerance": "all",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true",
  
  
      "connection.url": "jdbc:mysql://localhost:3306/product",
      "connection.user": "kafka",
      "connection.password": "123456",
  
      "transforms": "ReplaceField",
      "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.ReplaceField.blacklist": "id, brand",


            
      "insert.mode": "insert",

      "auto.create": "true",
      "auto.evolve": "true",

      "batch.size": 50000


    }
  }

Solution

  • You have put "transforms" key more than once in your JSON, which isn't valid.

    Try with one entry

    "transforms": "unwrap,ReplaceField,dropPrefix",
    

    You are getting the error because you have overrode the value, and unwrap, specifically, is no longer called, so you have nested Structs.

    The blacklist property got renamed to exclude, by the way - https://docs.confluent.io/platform/current/connect/transforms/replacefield.html#properties