Clickhouse can definitely read JSON messages from Kafka if they are flat JSON documents.
We indicate this with kafka_format = 'JSONEachRow'
in Clickhouse.
This is the way we currently using it:
CREATE TABLE topic1_kafka
(
ts Int64,
event String,
title String,
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1',
kafka_format = 'JSONEachRow'
This is fine as long as producers send flat JSON to topic1_kafka
. But not all producers send flat JSON, most of the applications generate nested JSON documents like this:
{
"ts": 1598033988,
"deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
"location": [39.920515, 32.853708],
"stats": {
"temp": 71.2,
"total_memory": 32,
"used_memory": 21.2
}
}
Unfortunately the JSON document above is not compatible with JSONEachRow
, therefore ClickHouse cannot map fields in the JSON document to columns in the table.
Is there any way to do this mapping?
EDIT: We want to map the nested json to a flat table like this:
CREATE TABLE topic1
(
ts Int64,
deviceId String,
location_1 Float64,
location_2 Float64,
stats_temp Float64,
stats_total_memory Float64,
stats_used_memory Float64
) ENGINE = MergeTree()
I had the same problem and I was able to solve it using JSONAsString
format. In this format, a single JSON object is interpreted as a single value. This format can only be parsed for a table with a single field of type String.
CREATE TABLE topic1_kafka (
data String
)
ENGINE = Kafka
SETTINGS
kafka_format = 'JSONAsString',
...
CREATE MATERIALIZED VIEW topic1_mv TO topic1 AS
SELECT
JSONExtractInt(data, 'ts') AS ts,
JSONExtractString(data, 'deviceId') AS deviceId,
JSONExtractFloat(data, 'location', 1) AS location_1, -- arrays are indexed from 1
JSONExtractFloat(data, 'location', 2) AS location_2,
JSONExtractFloat(data, 'stats', 'temp') AS stats_temp,
JSONExtractFloat(data, 'stats', 'total_memory') AS stats_total_memory,
JSONExtractFloat(data, 'stats', 'memory') AS stats_used_memory
FROM topic1_kafka
See also: