I have a JSONB field called metadata
in a Postgres table. When I use Debezium PostgreSQL Connector to generate CDC, it writes metadata
as a string into Kafka.
This one CDC I got in the Kafka topic my_db_server.public.product
:
{
"before": null,
"after": {
"id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
"metadata": "{\"operation\": \"CREATE\"}". <-- Here is a string
},
"source": {
"version": "1.8.0.Final",
"connector": "postgresql",
"name": "my_db_server",
"ts_ms": 1648074184197,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"25825800\",\"25833896\"]",
"schema": "public",
"table": "product",
"txId": 673,
"lsn": 25833896,
"xmin": null
},
"op": "c",
"ts_ms": 1648074184256,
"transaction": null
}
Saving Postgres JSON/JSONB field CDC as a string in Kafka is how it supposed to work based on https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types
I created a Kafka stream by KSQL by
CREATE STREAM my_stream (after STRUCT<id STRING,
metadata STRING>)
WITH (KAFKA_TOPIC='my_db_server.public.product',
VALUE_FORMAT='JSON');
Next I want to do some work based on operation
in the metadata
field. I can select after->metadata
, but not after->metadata->operation
because metadata
is a string:
SELECT after->id,
after->metadata->operation, <-- Give error: Expected STRUCT type, got: STRING
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after <-- I hope to change to after->metadata->operation next
EMIT CHANGES;
This will give me the error:
Expected STRUCT type, got: STRING
What would be the proper way to select after->metadata->operation
in this case? Or can I do some work when I create the stream? Thanks!
You can access operation
using extractjsonfield function like that:
SELECT after->id,
EXTRACTJSONFIELD(after->metadata, '$.operation'),
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after
EMIT CHANGES;