Search code examples
apache-kafkaprotocol-buffersclickhouse

Using nested protobuf messages with Clickhouse Kafka engine


I'm trying to use Clickhouse Kafka engine with source messages decoded to protobuf. I'm going to use nested schema, but faced with this issue. Here's my code:

syntax = "proto3";

message Product {
  string model = 1;
  float price = 2;
}

message TestProto {
  string name = 1;
  Product product = 2;
}
CREATE TABLE IF NOT EXISTS kafka_table
(
    `name` String,
    `product` Nested(
        `model` String,
        `price` Float32
    )

)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'test_proto_topic',
    kafka_group_name = 'test_proto_topic_ch',
    kafka_format = 'ProtobufSingle',
    format_schema = 'test.proto:TestProto'

CREATE TABLE IF NOT EXISTS target_table
(
    `name` String,
    `model` String,
    `price` Float32
)
ENGINE = Memory

CREATE MATERIALIZED VIEW IF NOT EXISTS target_table_view TO target_table
AS
SELECT
    `name`,
    `product.model` as model,
    `product.price` as price
FROM kafka_table

Everything works well except one thing - columns model and price doesn't filling with values from protobuf nested message Product and stored with default values.

How should I define tables correctly to make this works as expected?


Solution

  • I finally solved this issue, and there are two ways. First is to create source kafka table with tuple:

    CREATE TABLE IF NOT EXISTS kafka_table
    (
        `name` String,
        `product` Tuple(
            model String,
            price Float32
        )
    
    )
    ...
    

    It's named tuple, but you can not access to this fields via names, you need to use indexes:

    CREATE MATERIALIZED VIEW IF NOT EXISTS target_table_view TO target_table
    AS
    SELECT
        name,
        product.1 as model,
        product.2 as price
    FROM kafka_table
    

    It's works, but I think it's not very convenient to use indexes, if your message has a lot of fileds.

    Second is more easily:

    CREATE TABLE IF NOT EXISTS kafka_table
    (
        name String,
        product_model String,
        product_price Float32
    )
    ...
    CREATE MATERIALIZED VIEW IF NOT EXISTS target_table_view TO target_table
    AS
    SELECT
        name,
        product_model as model,
        product_price as price
    FROM kafka_table
    

    Works like magic, here's the docs - https://clickhouse.com/docs/en/interfaces/formats#protobuf Clickhouse tries to find all nested fields by itself and names it like 'x.y.z' or 'x_y_z'