I have a Kafka topic with messages from my firewall in JSON-format. Every message has a field "message" which is comma-separated value:
{
"@timestamp": 1690745699,
"pri": "14",
"time": "Jul 30 19:34:59",
"ident": "FIREWALL",
"message": "aa,bb,cc,dd"
}
I want to delimit this value in another stream to get such result:
ksql> select * from CONVERT_FROM_PALOALTO_SER;
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|FIRST |SECOND |THIRD |FOURTH |
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|aa |bb |cc |dd |
So, I create first stream with only one field - message:
CREATE STREAM CONVERT_FROM_FIREWALL ( message string) WITH (KAFKA_TOPIC='from-syslog-firewall', VALUE_FORMAT='JSON');
ksql> select * from CONVERT_FROM_FIREWALL limit 1;
+MESSAGE
|+-|aa,bb,cc,dd
Next, I create stream which writes only value without field:
CREATE STREAM CONVERT_FROM_FIREWALL_DELIMITED WITH (VALUE_FORMAT='DELIMITED', KAFKA_TOPIC='from-syslog-firewall_delimited') AS SELECT * FROM CONVERT_FROM_FIREWALL;
In Kafka i see the message in double quotes:
Then i create third stream:
CREATE STREAM CONVERT_FROM_FIREWALL_SER ( first string, second string, third string, fourth string)
WITH (KAFKA_TOPIC='from-syslog-firewall_delimited', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER=',');
But when new message appears in Kafka, i get ksqldb error:
Caused by: org.apache.kafka.common.errors.SerializationException: Column count mismatch on deserialization. topic: from-syslog-firewall_delimited, expected: 4, got: 1
It should be mentioned, that if i send message to kafka without quotes, there are no errors in ksqldb and stream gets value from topic:
ksql> select * from CONVERT_FROM_FIREWALL_SER;
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|FIRST |SECOND |THIRD |FOURTH |
+--------------------------------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|aa |bb |cc |dd |
How should I deal with this kind of a problem? What am I doing wrong?
For my case, changing value_format to KAFKA helped:
CREATE STREAM CONVERT_FROM_FIREWALL_DELIMITED WITH (VALUE_FORMAT='KAFKA', KAFKA_TOPIC='from-syslog-firewall_delimited') AS SELECT * FROM CONVERT_FROM_FIREWALL;
With this format, messages get to topic without double quotes.