Search code examples
ksqldb

How to create KSQLdb stream using EXTRACTJSONFIELD(message, '$.payload.version')


How do I create a a KSQLdb stream of value_type JSON using EXTRACTJSONFIELD? Would this be done using a select statement? I am not clear how field names could be defined for the stream during creation of the stream using EXTRACTJSONFIELD operator.

Thanks


Solution

  • You use AS to alias the field name. Here's an example.

    Dummy source data:

     ksql> PRINT 'source_data' FROM BEGINNING;
     Format:JSON
     {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
     {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
    

    Declare the source stream

     CREATE STREAM my_stream (Header VARCHAR, 
                              RAFld1 VARCHAR, 
                              RAFld2 VARCHAR, 
                              RBFld1 VARCHAR) 
     WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
    

    Create the derived stream

     CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS 
     SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, 
             EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, 
             EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, 
             EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld 
             FROM my_stream \
     WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
    

    (Source)