Search code examples
ksqldb

ksqldb can't get delimited message SerializationException


I have a Kafka topic with messages from my firewall in JSON-format. Every message has a field "message" which is comma-separated value:

{
    "@timestamp": 1690745699,
    "pri": "14",
    "time": "Jul 30 19:34:59",
    "ident": "FIREWALL",
    "message": "aa,bb,cc,dd"
}

I want to delimit this value in another stream to get such result:

ksql> select * from CONVERT_FROM_PALOALTO_SER;
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+      
|FIRST                                       |SECOND                                      |THIRD                                       |FOURTH                                      |      
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+      
|aa                                          |bb                                          |cc                                          |dd                                          |    

So, I create first stream with only one field - message:

CREATE STREAM CONVERT_FROM_FIREWALL ( message string) WITH (KAFKA_TOPIC='from-syslog-firewall', VALUE_FORMAT='JSON');

ksql> select * from CONVERT_FROM_FIREWALL limit 1;
+MESSAGE                                                                                                                                                                                  
|+-|aa,bb,cc,dd

Next, I create stream which writes only value without field:

CREATE STREAM CONVERT_FROM_FIREWALL_DELIMITED  WITH (VALUE_FORMAT='DELIMITED', KAFKA_TOPIC='from-syslog-firewall_delimited')  AS SELECT * FROM CONVERT_FROM_FIREWALL;

In Kafka i see the message in double quotes: Kafka-ui message

Then i create third stream:

CREATE STREAM CONVERT_FROM_FIREWALL_SER ( first string, second string, third string, fourth string)        
 WITH (KAFKA_TOPIC='from-syslog-firewall_delimited', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER=',');

But when new message appears in Kafka, i get ksqldb error:

Caused by: org.apache.kafka.common.errors.SerializationException: Column count mismatch on deserialization. topic: from-syslog-firewall_delimited, expected: 4, got: 1

It should be mentioned, that if i send message to kafka without quotes, there are no errors in ksqldb and stream gets value from topic: Kafka message without quotes

ksql> select * from CONVERT_FROM_FIREWALL_SER;
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+      
|FIRST                                       |SECOND                                      |THIRD                                       |FOURTH                                      |      
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+      
|aa                                          |bb                                          |cc                                          |dd                                          | 

How should I deal with this kind of a problem? What am I doing wrong?


Solution

  • For my case, changing value_format to KAFKA helped:

    CREATE STREAM CONVERT_FROM_FIREWALL_DELIMITED  WITH (VALUE_FORMAT='KAFKA', KAFKA_TOPIC='from-syslog-firewall_delimited')  AS SELECT * FROM CONVERT_FROM_FIREWALL;
    

    With this format, messages get to topic without double quotes.