I have standard mechanism to stream data from kafka topic to clickhouse , I mean algoritm like this Table(Kafka engine) -> Materialized view TO -> Destination Table(MergeTree). It works well , I need to add two columns to this algoritm , when I drop this chain of tables and recreate this again with new columns the data doesn't come to destination table , there is no specific errors in log that signals about error that related with streaming . When I recreate tables back with no new columns It works good again. I found that new columns exists only in one message , other messages does not have new columns. I don't have big experience with kafka but per my understanding the problem can be related that I have no new data loaded in destination table because not every kafka message has new columns. What can be done to resolve this problem ? I have idea to use JSONforEach row format to stream raw data into one column and handle this data using JSON clickhouse functions but this does not work also and it send me specific error that input data can not be parsed , is it probably some settings should be changed on Kafka Side ? (again I didn't see how it looks like from kafka , I'm using Offset Explorer to see data.).
/ *--------------------------------------CREATE
DESTINATION
TABLE
LOCAL - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_local
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
ORDER
BY
`master_id`
SETTINGS
index_granularity = 8192;
/ *--------------------------------------CREATE
DESTINATION
TABLE
DISTRIBUTED - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
/ *--------------------------------------CREATE
KAFKA
TABLE - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_kafka
(
`event_type` String,
`source` String,
`timestamp` UInt64,
`data.master_id` Nullable(String),
`data.event_params_app`
Nullable(String),
`data.items_item_list_name`
Nullable(String),
`data.event_params_parameter`
Nullable(String),
`data.items_affiliation`
Nullable(String),
`data.event_params_item_cat`
Nullable(String),
`data.event_params_item_id`
Nullable(String),
`data.event_params_step`
Nullable(String),
`data.event_params_item_name`
Nullable(String),
`data.event_params_dest`
Nullable(String),
`data.items_promotion_id`
Nullable(String),
`data.step_number`
Nullable(String),
`data.items_item_name`
Nullable(String),
`data.event_params_item_variant`
Nullable(String),
`data.event_params_origin`
Nullable(String),
`data.event_params_way`
Nullable(String),
`data.items_item_brand`
Nullable(String),
`data.items_promotion_name`
Nullable(String),
`data.event_params_item_cat2`
Nullable(String),
`data.event`
Nullable(String),
`data.event_params_ux_ui`
Nullable(String),
`data.items_item_cat2`
Nullable(String),
`data.items_item_cat3`
Nullable(String),
`data.event_params_type`
Nullable(String),
`data.items_item_cat4`
Nullable(String),
`data.event_params_flow`
Nullable(String),
`data.event_params_segment`
Nullable(String),
`data.items_item_id`
Nullable(String),
`data.items_item_cat5`
Nullable(String),
`data.itemslocation_id`
Nullable(String),
`data.event_params_error`
Nullable(String),
`data.event_params_result`
Nullable(String),
`data.items_item_cat`
Nullable(String),
`data.event_params_quantity`
Nullable(String),
`data.event_params_id`
Nullable(String),
`data.items_item_variant`
Nullable(String),
`data.event_name`
Nullable(String),
`data.items_item_list_index`
Nullable(String) - -,
-- `data.event_params_item_list_name`
Nullable(String),
-- `data.event_params_promo_name`
Nullable(String)
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
kafka_topic_list = 'SOMETOPIC',
kafka_format = 'AvroConfluent',
kafka_num_consumers = 1,
kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
format_avro_schema_registry_url = 'registry_url',
kafka_client_id = 'someclient( I change this every time when I recreate tables ';
/ *--------------------------------------CREATE
MATERIALIZED
VIEW - ----------------- * /
CREATE
MATERIALIZED
VIEW
database_name.ga_events_kafka_mv
TO
database_name.ga_events
(
`event_type` String,
`source` String,
`timestamp` DateTime64(3),
`master_id`
Nullable(String),
`event_params_app`
Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
-- `event_params_item_list_name`
Nullable(String),
-- `event_params_promo_name`
Nullable(String),
`deleted`
UInt8
)
AS
SELECT
`event_type`,
`source`,
`timestamp`,
`data.master_id` as master_id,
`data.event_params_app` as event_params_app,
`data.items_item_list_name` as items_item_list_name,
`data.event_params_parameter` as event_params_parameter,
`data.items_affiliation` as items_affiliation,
`data.event_params_item_cat` as event_params_item_cat,
`data.event_params_item_id` as event_params_item_id,
`data.event_params_step` as event_params_step,
`data.event_params_item_name` as event_params_item_name,
`data.event_params_dest` as event_params_dest,
`data.items_promotion_id` as items_promotion_id,
`data.step_number` as step_number,
`data.items_item_name` as items_item_name,
`data.event_params_item_variant` as event_params_item_variant,
`data.event_params_origin` as event_params_origin,
`data.event_params_way` as event_params_way,
`data.items_item_brand` as items_item_brand,
`data.items_promotion_name` as items_promotion_name,
`data.event_params_item_cat2` as event_params_item_cat2,
`data.event` as event,
`data.event_params_ux_ui` as event_params_ux_ui,
`data.items_item_cat2` as items_item_cat2,
`data.items_item_cat3` as items_item_cat3,
`data.event_params_type` as event_params_type,
`data.items_item_cat4` as items_item_cat4,
`data.event_params_flow` as event_params_flow,
`data.event_params_segment` as event_params_segment,
`data.items_item_id` as items_item_id,
`data.items_item_cat5` as items_item_cat5,
`data.itemslocation_id` as itemslocation_id,
`data.event_params_error` as event_params_error,
`data.event_params_result` as event_params_result,
`data.items_item_cat` as items_item_cat,
`data.event_params_quantity` as event_params_quantity,
`data.event_params_id` as event_params_id,
`data.items_item_variant` as items_item_variant,
`data.event_name` as event_name,
`data.items_item_list_index` as items_item_list_index,
-- `data.event_params_promo_name` as event_params_promo_name,
-- `data.event_params_item_list_name` as event_params_item_list_name,
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END as deleted
FROM
database_name.ga_events_kafka
where
`data.master_id` is not null and `data.master_id` <> ''
Columns that should be added :
-- data.event_params_item_list_name
Nullable(String),
-- data.event_params_promo_name
Nullable(String)
input_format_avro_allow_missing_fields = 1 fixes the issue.
Enables using fields that are not specified in Avro or AvroConfluent format schema. When a field is not found in the schema, ClickHouse uses the default value instead of throwing an exception.
Possible values:
0 — Disabled. 1 — Enabled. Default value: 0.