Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.
For example:
Let's say, I need messages in invoice
topic split into item_inventory_delta
messages
key: saleschecknumber
message example:
{
"total": 12.33,
"salecounter": 1,
"items": [
{
"itemId": 123,
"quantity": 1
},
{
"itemId": 345,
"quantity": 5
}
]
}
key: saleschecknumber_itemID
message examples
{
"itemId": 123,
"quantity": 1
}
2.
{
"itemId": 345,
"quantity": 5
}
As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE
table function.
Given a topic invoice
with JSON payload per your example, first inspect the topic using PRINT
to dump its contents:
ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
Then declare a schema on topic of the topic, which gives us a ksqlDB stream:
CREATE STREAM INVOICE (total DOUBLE,
salecounter INT,
items ARRAY<STRUCT<itemId INT,
quantity INT>>)
WITH (KAFKA_TOPIC='invoice',
VALUE_FORMAT='JSON');
This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.
Create a new Kafka topic, populated continually from the messages arriving in the source stream:
CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS
SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID,
EXPLODE(ITEMS)->QUANTITY AS QUANTITY
FROM INVOICE;
New topic has been created:
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------
invoice | 1 | 1
item_inventory_delta | 1 | 1
Topic has delta messages as requested :)
ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}