Search code examples
sqlapache-kafkaintegrationloadingclickhouse

Adding columns to clickhouse kafka tables chain


I have standard mechanism to stream data from kafka topic to clickhouse , I mean algoritm like this Table(Kafka engine) -> Materialized view TO -> Destination Table(MergeTree). It works well , I need to add two columns to this algoritm , when I drop this chain of tables and recreate this again with new columns the data doesn't come to destination table , there is no specific errors in log that signals about error that related with streaming . When I recreate tables back with no new columns It works good again. I found that new columns exists only in one message , other messages does not have new columns. I don't have big experience with kafka but per my understanding the problem can be related that I have no new data loaded in destination table because not every kafka message has new columns. What can be done to resolve this problem ? I have idea to use JSONforEach row format to stream raw data into one column and handle this data using JSON clickhouse functions but this does not work also and it send me specific error that input data can not be parsed , is it probably some settings should be changed on Kafka Side ? (again I didn't see how it looks like from kafka , I'm using Offset Explorer to see data.).

            / *--------------------------------------CREATE
            DESTINATION
            TABLE
            LOCAL - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events_local
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
            ORDER
            BY
            `master_id`
            SETTINGS
            index_granularity = 8192;
            / *--------------------------------------CREATE
            DESTINATION
            TABLE
            DISTRIBUTED - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
            / *--------------------------------------CREATE
            KAFKA
            TABLE - ------------------------------------- * /
            CREATE
            TABLE
            database_name.ga_events_kafka
            (
            
                `event_type` String,
            `source` String,
            `timestamp` UInt64,
            `data.master_id` Nullable(String),
            `data.event_params_app`
            Nullable(String),
            `data.items_item_list_name`
            Nullable(String),
            `data.event_params_parameter`
            Nullable(String),
            `data.items_affiliation`
            Nullable(String),
            `data.event_params_item_cat`
            Nullable(String),
            `data.event_params_item_id`
            Nullable(String),
            `data.event_params_step`
            Nullable(String),
            `data.event_params_item_name`
            Nullable(String),
            `data.event_params_dest`
            Nullable(String),
            `data.items_promotion_id`
            Nullable(String),
            `data.step_number`
            Nullable(String),
            `data.items_item_name`
            Nullable(String),
            `data.event_params_item_variant`
            Nullable(String),
            `data.event_params_origin`
            Nullable(String),
            `data.event_params_way`
            Nullable(String),
            `data.items_item_brand`
            Nullable(String),
            `data.items_promotion_name`
            Nullable(String),
            `data.event_params_item_cat2`
            Nullable(String),
            `data.event`
            Nullable(String),
            `data.event_params_ux_ui`
            Nullable(String),
            `data.items_item_cat2`
            Nullable(String),
            `data.items_item_cat3`
            Nullable(String),
            `data.event_params_type`
            Nullable(String),
            `data.items_item_cat4`
            Nullable(String),
            `data.event_params_flow`
            Nullable(String),
            `data.event_params_segment`
            Nullable(String),
            `data.items_item_id`
            Nullable(String),
            `data.items_item_cat5`
            Nullable(String),
            `data.itemslocation_id`
            Nullable(String),
            `data.event_params_error`
            Nullable(String),
            `data.event_params_result`
            Nullable(String),
            `data.items_item_cat`
            Nullable(String),
            `data.event_params_quantity`
            Nullable(String),
            `data.event_params_id`
            Nullable(String),
            `data.items_item_variant`
            Nullable(String),
            `data.event_name`
            Nullable(String),
            `data.items_item_list_index`
            Nullable(String) - -,
            --    `data.event_params_item_list_name`
            Nullable(String),
            --    `data.event_params_promo_name`
            Nullable(String)
            )
            ENGINE = Kafka
            SETTINGS
            kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
            kafka_topic_list = 'SOMETOPIC',
            kafka_format = 'AvroConfluent',
            kafka_num_consumers = 1,
            kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
            format_avro_schema_registry_url = 'registry_url',
            kafka_client_id = 'someclient( I change this every time when I recreate tables ';
            
            / *--------------------------------------CREATE
            MATERIALIZED
            VIEW - ----------------- * /
            CREATE
            MATERIALIZED
            VIEW
            database_name.ga_events_kafka_mv
            TO
            database_name.ga_events
            
            (
                `event_type` String,
            `source` String,
            `timestamp` DateTime64(3),
            `master_id`
            Nullable(String),
            `event_params_app`
            Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --                        `event_params_item_list_name`
            Nullable(String),
            --                        `event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8
            
            )
            AS
            SELECT
            
            `event_type`,
            `source`,
            `timestamp`,
            `data.master_id` as master_id,
            `data.event_params_app` as event_params_app,
            `data.items_item_list_name` as items_item_list_name,
            `data.event_params_parameter` as event_params_parameter,
            `data.items_affiliation` as items_affiliation,
            `data.event_params_item_cat` as event_params_item_cat,
            `data.event_params_item_id` as event_params_item_id,
            `data.event_params_step` as event_params_step,
            `data.event_params_item_name` as event_params_item_name,
            `data.event_params_dest` as event_params_dest,
            `data.items_promotion_id` as items_promotion_id,
            `data.step_number` as step_number,
            `data.items_item_name` as items_item_name,
            `data.event_params_item_variant` as event_params_item_variant,
            `data.event_params_origin` as event_params_origin,
            `data.event_params_way` as event_params_way,
            `data.items_item_brand` as items_item_brand,
            `data.items_promotion_name` as items_promotion_name,
            `data.event_params_item_cat2` as event_params_item_cat2,
            `data.event` as event,
            `data.event_params_ux_ui` as event_params_ux_ui,
            `data.items_item_cat2` as items_item_cat2,
            `data.items_item_cat3` as items_item_cat3,
            `data.event_params_type` as event_params_type,
            `data.items_item_cat4` as items_item_cat4,
            `data.event_params_flow` as event_params_flow,
            `data.event_params_segment` as event_params_segment,
            `data.items_item_id` as items_item_id,
            `data.items_item_cat5` as items_item_cat5,
            `data.itemslocation_id` as itemslocation_id,
            `data.event_params_error` as event_params_error,
            `data.event_params_result` as event_params_result,
            `data.items_item_cat` as items_item_cat,
            `data.event_params_quantity` as event_params_quantity,
            `data.event_params_id` as event_params_id,
            `data.items_item_variant` as items_item_variant,
            `data.event_name` as event_name,
            `data.items_item_list_index` as items_item_list_index,
            --    `data.event_params_promo_name` as event_params_promo_name,
            --    `data.event_params_item_list_name` as event_params_item_list_name,
            
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END as deleted
            
            FROM
            database_name.ga_events_kafka
            where
            `data.master_id` is not null and `data.master_id` <> ''          

Columns that should be added :

-- data.event_params_item_list_name Nullable(String), -- data.event_params_promo_name Nullable(String)


Solution

  • input_format_avro_allow_missing_fields = 1 fixes the issue.

    Enables using fields that are not specified in Avro or AvroConfluent format schema. When a field is not found in the schema, ClickHouse uses the default value instead of throwing an exception.

    Possible values:

    0 — Disabled. 1 — Enabled. Default value: 0.