Search code examples
apache-kafkaavroapache-kafka-connect

Kafka JDBC sink no handling null values


I am trying to insert data with the Kafka JDBC Sink connector, but it is returning me this exception.

org.apache.kafka.connect.errors.DataException: Invalid null value for required INT64 field

The records have the following schema:

[
  {
    "schema": {
      "type": "struct",
      "fields": [
        {
          "type": "int64",
          "field": "ID"
        },
        {
          "type": "int64",
          "field": "TENANT_ID"
        },
        {
          "type": "string",
          "field": "ITEM"
        },
        {
          "type": "int64",
          "field": "TIPO"
        },
        {
          "type": "int64",
          "field": "BUSINESS_CONCEPT"
        },
        {
          "type": "string",
          "field": "ETIQUETA"
        },
        {
          "type": "string",
          "field": "VALOR"
        },
        {
          "type": "string",
          "field": "GG_T_TYPE"
        },
        {
          "type": "string",
          "field": "GG_T_TIMESTAMP"
        },
        {
          "type": "string",
          "field": "TD_T_TIMESTAMP"
        },
        {
          "type": "string",
          "field": "POS"
        }
      ]
    },
    "payload": {
      "ID": 298457,
      "TENANT_ID": 83,
      "ITEM": "0-0-0",
      "TIPO": 4,
      "BUSINESS_CONCEPT": null,
      "ETIQUETA": "¿Cuándo ha ocurrido?",
      "VALOR": "2019-05-31T10:33:00Z",
      "GG_T_TYPE": "I",
      "GG_T_TIMESTAMP": "2019-05-31 14:35:19.002221",
      "TD_T_TIMESTAMP": "2019-06-05T10:46:55.0106",
      "POS": "00000000530096832544"
    }
  }
]

As you can see, the value BUSINESS_CONCEPT can be null. It is the only null value, so I suppose the exception is due to that field. How could I make the sink insert the value as null?


Solution

  • You need to change the definition of

    {
      "type": "int64",
      "field": "BUSINESS_CONCEPT"
    }
    

    to

    {
      "type": ["null", "int64"],
      "field": "BUSINESS_CONCEPT"
    }
    

    in order to treat BUSINESS_CONCEPT as optional field.