Search code examples
jsonapache-kafkaclickhouse

Consuming nested JSON message from Kafka with ClickHouse


Clickhouse can definitely read JSON messages from Kafka if they are flat JSON documents.

We indicate this with kafka_format = 'JSONEachRow' in Clickhouse.

This is the way we currently using it:

CREATE TABLE topic1_kafka
(
    ts Int64,
    event String,
    title String,
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1', 
kafka_format = 'JSONEachRow'

This is fine as long as producers send flat JSON to topic1_kafka. But not all producers send flat JSON, most of the applications generate nested JSON documents like this:

{
  "ts": 1598033988,
  "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
  "location": [39.920515, 32.853708],
  "stats": {
    "temp": 71.2,
    "total_memory": 32,
    "used_memory": 21.2
  }
}

Unfortunately the JSON document above is not compatible with JSONEachRow, therefore ClickHouse cannot map fields in the JSON document to columns in the table.

Is there any way to do this mapping?

EDIT: We want to map the nested json to a flat table like this:

CREATE TABLE topic1
(
    ts Int64,
    deviceId String,
    location_1 Float64,
    location_2 Float64,
    stats_temp Float64,
    stats_total_memory Float64,
    stats_used_memory Float64
) ENGINE = MergeTree()

Solution

  • I had the same problem and I was able to solve it using JSONAsString format. In this format, a single JSON object is interpreted as a single value. This format can only be parsed for a table with a single field of type String.

    CREATE TABLE topic1_kafka (
      data String
    ) 
    ENGINE = Kafka
    SETTINGS 
        kafka_format = 'JSONAsString',
        ...
    
    
    CREATE MATERIALIZED VIEW topic1_mv TO topic1 AS
    SELECT 
        JSONExtractInt(data, 'ts') AS ts, 
        JSONExtractString(data, 'deviceId') AS deviceId, 
        JSONExtractFloat(data, 'location', 1) AS location_1,  -- arrays are indexed from 1
        JSONExtractFloat(data, 'location', 2) AS location_2,
        JSONExtractFloat(data, 'stats', 'temp') AS stats_temp, 
        JSONExtractFloat(data, 'stats', 'total_memory') AS stats_total_memory, 
        JSONExtractFloat(data, 'stats', 'memory') AS stats_used_memory 
    FROM topic1_kafka
    

    See also: