Search code examples
apache-kafkaapache-kafka-streamsksqldb

Define KSQL STRUCT on JSON valued topic with different types


(Edit: slight edits to better reflect intention, but large edit due to progress made.)

A topic "t_raw" is given messages of multiple types, where they all contain a common "type" key:

{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}

Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT for everything, but my current effort has me doing this:

create stream raw (type varchar, data varchar) \
  with (kafka_topic='t_raw', value_format='JSON');

for the first level, then

create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
  from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
    extractjsonfield(data, '$.d') as d \
  from raw where type='key2';

This seems to work, but with the recent addition of STRUCT, is there a way to use it in lieu of extractjsonfield as done above?

ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2

If not with STRUCT, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql, ergo the tag)?

Is there a more kafka-esque/efficient/elegant way to parse this? I cannot define it as an empty STRUCT<>

ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
      WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}

There is some (not-so-recent) discussion on being able to do something like

CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
  SELECT data->* from some_input where type = 'key1';

FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield bug and enabled this solution.

The real data has several more similar message types. They all contain "type" and "data" keys (and no others at the top-level), and almost all have the "ts" timestamp equivalent nested within "data".


Solution

  • Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.

    Test data setup

    Populate some test data into the topic:

    kafkacat -b kafka:29092 -t t_raw -P <<EOF
    {"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
    {"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
    EOF
    

    Dump the topic to KSQL console for inspection:

    ksql> PRINT 't_raw' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
    {"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
    ^CTopic printing ceased
    ksql>
    

    Model the source stream of data

    Create a stream over it. Note the use of STRUCT and reference of every possible column:

    CREATE STREAM T (TYPE VARCHAR, \
                     DATA STRUCT< \
                          TS VARCHAR, \
                          A INT, \
                          B VARCHAR, \
                          C INT, \
                          D VARCHAR>) \
            WITH (KAFKA_TOPIC='t_raw',\
                  VALUE_FORMAT='JSON');
    

    Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    ksql>
    ksql> SELECT * FROM T;
    1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
    1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
    1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
    1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
    ^CQuery terminated
    

    Query the types individually, using the -> operator to access the nested elements:

    ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
    1 | hello
    2 | hello2
    
    ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
    1 | 11 | goodbye
    3 | 22 | goodbye2
    

    Persist the data in separate Kafka topics:

    Populate target topics with the separated data:

    ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    

    Schema for the new streams:

    ksql> DESCRIBE TYPE_1;
    
    Name                 : TYPE_1
    Field    | Type
    --------------------------------------
    ROWTIME  | BIGINT           (system)
    ROWKEY   | VARCHAR(STRING)  (system)
    DATA__TS | VARCHAR(STRING)
    DATA__A  | INTEGER
    DATA__B  | VARCHAR(STRING)
    --------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ksql> DESCRIBE TYPE_2;
    
    Name                 : TYPE_2
    Field    | Type
    --------------------------------------
    ROWTIME  | BIGINT           (system)
    ROWKEY   | VARCHAR(STRING)  (system)
    DATA__TS | VARCHAR(STRING)
    DATA__A  | INTEGER
    DATA__C  | INTEGER
    DATA__D  | VARCHAR(STRING)
    --------------------------------------
    

    Topics underpin each KSQL stream:

    ksql> LIST TOPICS;
    
    Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    ---------------------------------------------------------------------------------------------------------
    t_raw                       | true       | 1          | 1                  | 2         | 2
    TYPE_1                      | true       | 4          | 1                  | 0         | 0
    TYPE_2                      | true       | 4          | 1                  | 0         | 0
    ---------------------------------------------------------------------------------------------------------