How do I create a a KSQLdb stream of value_type JSON using EXTRACTJSONFIELD? Would this be done using a select statement? I am not clear how field names could be defined for the stream during creation of the stream using EXTRACTJSONFIELD operator.
Thanks
You use AS
to alias the field name. Here's an example.
Dummy source data:
ksql> PRINT 'source_data' FROM BEGINNING;
Format:JSON
{"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
{"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
Declare the source stream
CREATE STREAM my_stream (Header VARCHAR,
RAFld1 VARCHAR,
RAFld2 VARCHAR,
RBFld1 VARCHAR)
WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
Create the derived stream
CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField,
EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld,
EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld,
EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld
FROM my_stream \
WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
(Source)