I am trying to insert data with the Kafka JDBC Sink connector, but it is returning me this exception.
org.apache.kafka.connect.errors.DataException: Invalid null value for required INT64 field
The records have the following schema:
[
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"field": "ID"
},
{
"type": "int64",
"field": "TENANT_ID"
},
{
"type": "string",
"field": "ITEM"
},
{
"type": "int64",
"field": "TIPO"
},
{
"type": "int64",
"field": "BUSINESS_CONCEPT"
},
{
"type": "string",
"field": "ETIQUETA"
},
{
"type": "string",
"field": "VALOR"
},
{
"type": "string",
"field": "GG_T_TYPE"
},
{
"type": "string",
"field": "GG_T_TIMESTAMP"
},
{
"type": "string",
"field": "TD_T_TIMESTAMP"
},
{
"type": "string",
"field": "POS"
}
]
},
"payload": {
"ID": 298457,
"TENANT_ID": 83,
"ITEM": "0-0-0",
"TIPO": 4,
"BUSINESS_CONCEPT": null,
"ETIQUETA": "¿Cuándo ha ocurrido?",
"VALOR": "2019-05-31T10:33:00Z",
"GG_T_TYPE": "I",
"GG_T_TIMESTAMP": "2019-05-31 14:35:19.002221",
"TD_T_TIMESTAMP": "2019-06-05T10:46:55.0106",
"POS": "00000000530096832544"
}
}
]
As you can see, the value BUSINESS_CONCEPT
can be null
. It is the only null
value, so I suppose the exception is due to that field. How could I make the sink insert the value as null
?
You need to change the definition of
{
"type": "int64",
"field": "BUSINESS_CONCEPT"
}
to
{
"type": ["null", "int64"],
"field": "BUSINESS_CONCEPT"
}
in order to treat BUSINESS_CONCEPT
as optional field.