Search code examples
apache-kafkaksqldb

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key


I am trying to create a TABLE in KSQL using KAFKA TOPIC, the table got created successfully. When I try to select the data from table, I am getting the error

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

Data in Topic:

Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/11/15 14:03:55.013 Z, key: {"MESG_SEQ_NO":1015}, value: {"MESG_SEQ_NO":1015,"MESSAGENO":"1015","MESSAGECREATIONDATETIME":1554336000000,"OPCO_GLN":"L","OPCO_COUNTRYCODE":"L","STORENO":5809,"SRVPNO":320,"ENDDATETIMETRANSACTION":1554336000000,"DATETIMESENTSTORE":1554336000000,"MESG_ACTION":"I","MESG_IND_COMPLETED":"N","MESG_IND_SENTTOBROKER":"N","MESG_ARTCLE_SALES_SEQ_NO":1015,"NASANUMBER":584623,"LKARNUMBER":null,"AMOUNTCE":1,"AMOUNTWEIGHT":null,"VOLUME":null,"AMOUNTCEDISCOUNT":null,"AMOUNTCEPRICED":null,"AMOUNTWEIGHTDISCOUNT":null,"AMOUNTWEIGHTPRICED":null,"SALEVALUE":null,"DISCOUNTVALUE":null,"PROMOTIONVALUE":null}, partition: 0

Table Structure

ksql> CREATE TABLE MSG_2639_SALES_TRNS_STORES_TABLE_S( MESG_SEQ_NO INT(4,0) PRIMARY KEY, MESSAGENO STRING, MESSAGECREATIONDATETIME TIMESTAMP,
>OPCO_GLN STRING, OPCO_COUNTRYCODE STRING, STORENO INT(4,0), SRVPNO INT(4,0),
>ENDDATETIMETRANSACTION TIMESTAMP, DATETIMESENTSTORE TIMESTAMP, MESG_ACTION STRING, MESG_IND_COMPLETED STRING, MESG_IND_SENTTOBROKER STRING)
>with (kafka_topic='ah_topics_2639_sales' , key_format='JSON',  value_format='JSON');
 Message
---------------
 Table created
---------------


Selecting Data :

ksql> select * from MSG_2639_SALES_TRNS_STORES_TABLE_S emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|MESG_SEQ_NO |MESSAGENO   |MESSAGECREAT|OPCO_GLN    |OPCO_COUNTRY|STORENO     |SRVPNO      |ENDDATETIMET|DATETIMESENT|MESG_ACTION |MESG_IND_COM|MESG_IND_SEN|
|            |            |IONDATETIME |            |CODE        |            |            |RANSACTION  |STORE       |            |PLETED      |TTOBROKER   |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+

Done get any data, and i see the error in logs as org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT


Solution

  • You have MESG_SEQ_NO INT(4,0) PRIMARY KEY, however key: {"MESG_SEQ_NO":1015} is a Struct, not an integer (the name of the JSON field isn't automatically extracted and matched)

    Therefore, as the error says Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

    Last I checked, keys in ksql should ideally be primitive types rather than structured objects. If it is structured, you need to be explicit about it - https://docs.ksqldb.io/en/latest/reference/serialization/#deserialization-of-single-keys