Search code examples
postgresqlapache-kafkaksqldbdebeziumcdc

How to select value in a JSON string by KSQL?


I have a JSONB field called metadata in a Postgres table. When I use Debezium PostgreSQL Connector to generate CDC, it writes metadata as a string into Kafka.

This one CDC I got in the Kafka topic my_db_server.public.product:

{
  "before": null,
  "after": {
    "id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
    "metadata": "{\"operation\": \"CREATE\"}".    <-- Here is a string
  },
  "source": {
    "version": "1.8.0.Final",
    "connector": "postgresql",
    "name": "my_db_server",
    "ts_ms": 1648074184197,
    "snapshot": "false",
    "db": "my_db",
    "sequence": "[\"25825800\",\"25833896\"]",
    "schema": "public",
    "table": "product",
    "txId": 673,
    "lsn": 25833896,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1648074184256,
  "transaction": null
}

Saving Postgres JSON/JSONB field CDC as a string in Kafka is how it supposed to work based on https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types

enter image description here

I created a Kafka stream by KSQL by

CREATE STREAM my_stream (after STRUCT<id STRING,
                                      metadata STRING>)
              WITH (KAFKA_TOPIC='my_db_server.public.product',
                    VALUE_FORMAT='JSON');

Next I want to do some work based on operation in the metadata field. I can select after->metadata, but not after->metadata->operation because metadata is a string:

SELECT after->id,
       after->metadata->operation,    <-- Give error: Expected STRUCT type, got: STRING
       COUNT(after->id),
       WINDOWSTART AS window_start,
       WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after    <-- I hope to change to after->metadata->operation next
EMIT CHANGES;

This will give me the error:

Expected STRUCT type, got: STRING

What would be the proper way to select after->metadata->operation in this case? Or can I do some work when I create the stream? Thanks!


Solution

  • You can access operation using extractjsonfield function like that:

    SELECT after->id,
           EXTRACTJSONFIELD(after->metadata, '$.operation'),
           COUNT(after->id),
           WINDOWSTART AS window_start,
           WINDOWEND AS window_end
    FROM my_stream
    WINDOW TUMBLING (SIZE 20 SECONDS)
    GROUP BY after
    EMIT CHANGES;