I am trying to create a TABLE in KSQL using KAFKA TOPIC, the table got created successfully. When I try to select the data from table, I am getting the error
org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT
Data in Topic:
Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/11/15 14:03:55.013 Z, key: {"MESG_SEQ_NO":1015}, value: {"MESG_SEQ_NO":1015,"MESSAGENO":"1015","MESSAGECREATIONDATETIME":1554336000000,"OPCO_GLN":"L","OPCO_COUNTRYCODE":"L","STORENO":5809,"SRVPNO":320,"ENDDATETIMETRANSACTION":1554336000000,"DATETIMESENTSTORE":1554336000000,"MESG_ACTION":"I","MESG_IND_COMPLETED":"N","MESG_IND_SENTTOBROKER":"N","MESG_ARTCLE_SALES_SEQ_NO":1015,"NASANUMBER":584623,"LKARNUMBER":null,"AMOUNTCE":1,"AMOUNTWEIGHT":null,"VOLUME":null,"AMOUNTCEDISCOUNT":null,"AMOUNTCEPRICED":null,"AMOUNTWEIGHTDISCOUNT":null,"AMOUNTWEIGHTPRICED":null,"SALEVALUE":null,"DISCOUNTVALUE":null,"PROMOTIONVALUE":null}, partition: 0
Table Structure
ksql> CREATE TABLE MSG_2639_SALES_TRNS_STORES_TABLE_S( MESG_SEQ_NO INT(4,0) PRIMARY KEY, MESSAGENO STRING, MESSAGECREATIONDATETIME TIMESTAMP,
>OPCO_GLN STRING, OPCO_COUNTRYCODE STRING, STORENO INT(4,0), SRVPNO INT(4,0),
>ENDDATETIMETRANSACTION TIMESTAMP, DATETIMESENTSTORE TIMESTAMP, MESG_ACTION STRING, MESG_IND_COMPLETED STRING, MESG_IND_SENTTOBROKER STRING)
>with (kafka_topic='ah_topics_2639_sales' , key_format='JSON', value_format='JSON');
Message
---------------
Table created
---------------
Selecting Data :
ksql> select * from MSG_2639_SALES_TRNS_STORES_TABLE_S emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|MESG_SEQ_NO |MESSAGENO |MESSAGECREAT|OPCO_GLN |OPCO_COUNTRY|STORENO |SRVPNO |ENDDATETIMET|DATETIMESENT|MESG_ACTION |MESG_IND_COM|MESG_IND_SEN|
| | |IONDATETIME | |CODE | | |RANSACTION |STORE | |PLETED |TTOBROKER |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
Done get any data, and i see the error in logs as
org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT
You have MESG_SEQ_NO INT(4,0) PRIMARY KEY
, however key: {"MESG_SEQ_NO":1015}
is a Struct, not an integer (the name of the JSON field isn't automatically extracted and matched)
Therefore, as the error says Can't convert type. sourceType: ObjectNode, requiredType: BIGINT
Last I checked, keys in ksql should ideally be primitive types rather than structured objects. If it is structured, you need to be explicit about it - https://docs.ksqldb.io/en/latest/reference/serialization/#deserialization-of-single-keys