Search code examples
clickhouseclickhouse-kafka

Virtual columns in Kafka engine


I want to get virtual columns, but when I write the example code I get an error.

I'm following the Kafka engine documentation in Clickhouse:

[kafka-engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka

I have structure:

{
    "first": "name",
    "second": "name",
    "third": [
        "name"
    ],
    "object": {
        "data": {
            "fourth": "name"
        },
        "fifth": "name"
    }
}

CREATE TABLE IF NOT EXISTS test
(
    first String DEFAULT 'empty' Codec(LZ4),
    second String DEFAULT 'empty' Codec(LZ4),
    third String DEFAULT 'empty' Codec(LZ4),
    object_data_fourth String DEFAULT 'empty' Codec(LZ4),
    object_fifth String DEFAULT 'empty' Codec(LZ4),
)
ENGINE = MergeTree()
ORDER BY first;
-- ----------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS json_test
(
    all String
) ENGINE Kafka()
SETTINGS 
        kafka_broker_list = 'localhost:9093',
        kafka_topic_list = 'test',
        kafka_group_name = 'testGroup',
        kafka_format = 'JSONAsString',
        kafka_max_block_size = 1048576;
-- ----------------------------------------------------------------------------
CREATE MATERIALIZED VIEW json_test_mv TO test AS
SELECT
    JSONExtractString(all, 'first') AS first, 
    JSONExtractString(all, 'second') AS second, 
    JSONExtractString(all, 'third', 1) AS third, 
    JSONExtractString(all, 'object', 'data', 'fourth') AS object_data_fourth, 
    JSONExtractString(all, 'object', 'fifth') AS object_fifth
FROM json_test;
-- ----------------------------------------------------------------------------

The documentation tells me that if you need virtual columns you should:

SELECT _topic, _timestamp FROM table;

but I get error:

DB::Exception: Missing columns: '_topic' while processing query: 'SELECT _topic FROM table', required columns: '_topic'.

How to get virtual columns correctly?

I want to get _timestamp in the main table - test.


Solution

  • Virtual columns are available only in your Kafka table (json_test in your example). If you want to access this fields you'll need to extract them in the MV as well like:

    CREATE TABLE IF NOT EXISTS test
    (
        topic String, 
        timestamp Datetime,
        first String DEFAULT 'empty' Codec(LZ4),
        second String DEFAULT 'empty' Codec(LZ4),
        third String DEFAULT 'empty' Codec(LZ4),
        object_data_fourth String DEFAULT 'empty' Codec(LZ4),
        object_fifth String DEFAULT 'empty' Codec(LZ4),
    )
    ENGINE = MergeTree()
    ORDER BY first;
    
    CREATE MATERIALIZED VIEW json_test_mv TO test AS
    SELECT
        _topic AS topic,
        _timestamp AS timestamp,
        JSONExtractString(all, 'first') AS first, 
        JSONExtractString(all, 'second') AS second, 
        JSONExtractString(all, 'third', 1) AS third, 
        JSONExtractString(all, 'object', 'data', 'fourth') AS object_data_fourth, 
        JSONExtractString(all, 'object', 'fifth') AS object_fifth
    FROM json_test;