Search code examples
apache-kafkaapache-kafka-connect

Kafka-connect can't send schemaless JSON


I have a Zabbix-webhook service which sends data to kafka in JSON format:

  "value": {
  "name": "Zabbix server: Trend write cache, % used",
  "key": "zabbix[wcache,trend,pused]",
  "host": "Zabbix server",
  "groups": [
    "Zabbix servers"
  ],
  "applications": [],
  "itemid": 23276,
  "clock": 1670102276,
  "ns": 427825202,
  "value": 0.129509
}

I need to send these data to postgres via kafka-connect's JdbcSinkConnector:

curl -X POST "http://localhost:8082/connectors" -H "Content-Type: application/json" -d '{ 
    "name": "zabbix-sink-connector", 
    "config": { 
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", 
      "tasks.max": 1,
      "topics": "zabbix-webhook", 
      "connection.url": "jdbc:postgresql://<host>/etl?currentSchema=test", 
      "connection.user": "<username>", 
      "connection.password": "<password>", 
      "auto.create": "true"
    }
  }'

But when i'm trying to execute this curl, i get an error:

    ERROR WorkerSinkTask{id=zabbix-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask)
    org.apache.kafka.connect.errors.ConnectException: Sink connector 'zabbix-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='zabbix-webhook',partition=1,offset=566370,timestamp=1670104965261) with a HashMap value and null value schema 

According to this post https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-schema I changed variable CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE to false, but I still get the same error.

How should I send these JSON-data?


Solution

  • The JDBC Sink connector requires a schema. It cannot accept JSON without value.converter.schemas.enable=true, for the JSONConverter. You cannot disable it.

    More specifically, the JDBC connector requires a schema because it cannot guarantee/extract known key names in plain JSON, or know they'll have consistent value types. It needs that information to create the SQL statements (either create/alter table DML, or insert/update/delete DDL) .

    Since you don't control the Kafka producer, in order to get schema'd data, you can use ksqlDB to convert consume JSON data into JSONSchema, or other binary structured format, after which you can use Connect to write to the database