Search code examples
apache-kafkaksqldb

Kafka streams - KSQL - Split messages and publish to another topic


Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.

For example:

Let's say, I need messages in invoice topic split into item_inventory_delta messages

invoice topic

key: saleschecknumber

message example:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta topic

key: saleschecknumber_itemID

message examples

1.

{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}

Solution

  • As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE table function.

    Given a topic invoice with JSON payload per your example, first inspect the topic using PRINT to dump its contents:

    ksql> PRINT invoice FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
    

    Then declare a schema on topic of the topic, which gives us a ksqlDB stream:

    CREATE STREAM INVOICE (total DOUBLE, 
                           salecounter INT, 
                           items ARRAY<STRUCT<itemId INT, 
                                              quantity INT>>) 
                    WITH (KAFKA_TOPIC='invoice', 
                          VALUE_FORMAT='JSON');
    

    This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.

    Create a new Kafka topic, populated continually from the messages arriving in the source stream:

    CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
      SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
             EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
        FROM INVOICE;
    

    New topic has been created:

    ksql> SHOW TOPICS;
    
     Kafka Topic                     | Partitions | Partition Replicas
    -------------------------------------------------------------------
     invoice                         | 1          | 1
     item_inventory_delta            | 1          | 1
    

    Topic has delta messages as requested :)

    ksql> PRINT item_inventory_delta;
    Format:JSON
    {"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
    {"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}