I'm trying to create a materialized view using the ReplicatedAggregatingMergeTree
engine on a table that uses a ReplicatedMergeTree
engine.
After a few million rows I get DB::Exception: Memory limit (for query) exceeded
. Is there way to work around this?
CREATE MATERIALIZED VIEW IF NOT EXISTS shared.aggregated_calls_1h
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/shared/aggregated_calls_1h', '{replica}')
PARTITION BY toRelativeDayNum(retained_until_date)
ORDER BY (
client_id,
t,
is_synthetic,
source_application_ids,
source_service_id,
source_endpoint_id,
destination_application_ids,
destination_service_id,
destination_endpoint_id,
boundary_application_ids,
process_snapshot_id,
docker_snapshot_id,
host_snapshot_id,
cluster_snapshot_id,
http_status
)
SETTINGS index_granularity = 8192
POPULATE
AS
SELECT
client_id,
toUInt64(floor(t / (60000 * 60)) * (60000 *60)) AS t,
date,
toDate(retained_until_timestamp / 1000) retained_until_date,
is_synthetic,
source_application_ids,
source_service_id,
source_endpoint_id,
destination_application_ids,
destination_service_id,
destination_endpoint_id,
boundary_application_ids,
http_status,
process_snapshot_id,
docker_snapshot_id,
host_snapshot_id,
cluster_snapshot_id,
any(destination_endpoint) AS destination_endpoint,
any(destination_endpoint_type) AS destination_endpoint_type,
groupUniqArrayArrayState(destination_technologies) AS destination_technologies_state,
minState(ingestion_time) AS min_ingestion_time_state,
sumState(batchCount) AS sum_call_count_state,
sumState(errorCount) AS sum_error_count_state,
sumState(duration) AS sum_duration_state,
minState(toUInt64(ceil(duration/batchCount))) AS min_duration_state,
maxState(toUInt64(ceil(duration/batchCount))) AS max_duration_state,
quantileTimingWeightedState(0.25)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p25_state,
quantileTimingWeightedState(0.50)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p50_state,
quantileTimingWeightedState(0.75)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p75_state,
quantileTimingWeightedState(0.90)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p90_state,
quantileTimingWeightedState(0.95)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p95_state,
quantileTimingWeightedState(0.98)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p98_state,
quantileTimingWeightedState(0.99)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p99_state,
quantileTimingWeightedState(0.25)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p25_large_state,
quantileTimingWeightedState(0.50)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p50_large_state,
quantileTimingWeightedState(0.75)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p75_large_state,
quantileTimingWeightedState(0.90)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p90_large_state,
quantileTimingWeightedState(0.95)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p95_large_state,
quantileTimingWeightedState(0.98)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p98_large_state,
quantileTimingWeightedState(0.99)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p99_large_state,
sumState(minSelfTime) AS sum_min_self_time_state
FROM shared.calls_v2
WHERE sample_type != 'user_selected'
GROUP BY
client_id,
t,
date,
retained_until_date,
is_synthetic,
source_application_ids,
source_service_id,
source_endpoint_id,
destination_application_ids,
destination_service_id,
destination_endpoint_id,
boundary_application_ids,
process_snapshot_id,
docker_snapshot_id,
host_snapshot_id,
cluster_snapshot_id,
http_status
HAVING destination_endpoint_type != 'INTERNAL'
I use the Clickhouse database ("CH") since a long time (at least ~2019).
The problems of not having enough memory/RAM in relation to group/sort-by clauses can happen quite frequently in CH, which is often unexpected as other DBs (especially "normal" RDBMS like e.g. Postgres/Oracle/MariaDB/DB2/etc...) would mostly handle this by just using temporary data written&read to&from disk.
I came across this issue in CH again last week when trying to build & populating with data a materialized view ("MTW") against a set of data of ~9 billion rows (a 10% random selection of the underlying table hosting ~89 billion rows).
Here are the takeaways of my investigations.
(assumption: you use a dedicated user to build your MTW, therefore the settings that I mention would be changed only for that user, therefore not impacting other users)
config setting "optimize_aggregation_in_order"
config setting "max_threads"
upgrade your CH software to v23.10.3.5 or higher.
create table test_tbl
(
num_hashed_as_hex String CODEC(ZSTD(1))
,some_other_col1 UInt32 CODEC(ZSTD(1))
,some_other_col2 UInt32 CODEC(ZSTD(1))
,some_other_col3 UInt32 CODEC(ZSTD(1))
)
engine=MergeTree()
primary key (num_hashed_as_hex)
order by (num_hashed_as_hex)
settings index_granularity=8192,min_merge_bytes_to_use_direct_io=999999999999999999
2b) Populate it with 5B rows (500M unique "num_hashed_as_hex", each repeated 10 times):
insert into test_tbl
select num_hash_string, num_rand_uint32, num_rand_uint32, num_rand_uint32
from
(
select hex(murmurHash2_64(num2hash.number)) num_hash_string, rand32() num_rand_uint32
FROM numbers(0,500000000) as num2hash, numbers(0,10) repeat_each_hash
)
2c) Wait until 2b has finished, AND that all running merges have finished too (monitor the activities in "system.merges").
2d) Simplify the testcase by merging all parts into a single one:
optimize table test_tbl final
2e) Try to create the materialized view, populating it as well with data (clause "POPULATE" in the below SQL):
CREATE MATERIALIZED VIEW test_mvw_2
(
num_hashed_as_hex String CODEC(ZSTD(1))
,my_avg AggregateFunction(avg, UInt32) CODEC(ZSTD(1))
,my_sum AggregateFunction(sum, UInt32) CODEC(ZSTD(1))
)
ENGINE = AggregatingMergeTree()
ORDER BY (num_hashed_as_hex)
settings index_granularity=8192,min_merge_bytes_to_use_direct_io=999999999999999999
POPULATE
AS
select
num_hashed_as_hex
,avgState(some_other_col1)
,sumState(some_other_col2)
from test_tbl
group by num_hashed_as_hex
3a) Use only 1 thread:
(safest but slowest option)
By running a VM that had a total of only 20GiB available and the CH user's config set as follows...
<max_memory_usage>3000000000</max_memory_usage>
<max_bytes_before_external_sort>100000000</max_bytes_before_external_sort>
<max_bytes_before_external_group_by>1000000000</max_bytes_before_external_group_by>
<max_threads>1</max_threads>
...the MTW is created successfully when using only 1 thread.
While the MTW is being populated please keep an eye on the contents of your temp-directory (setting "tmp_path").
As soon as I change "max_threads" to anything higher than "1" then I start getting the usual error message:
Received exception from server (version 23.10.3): Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 2.84 GiB (attempt to allocate chunk of 134217728 bytes), maximum: 2.79 GiB.. (MEMORY_LIMIT_EXCEEDED)
To be exact the execution is successful while creating the temporary files in the temp-directory, but fails when CH tries to merge them and insert the data into the MTW.
This happens as well when I e.g. double the VM's RAM to 40GiB and increase a lot the limit of "max_memory_usage". No clue why the increase of RAM usage is not linear with the amount of threads.
3b) Temporarily enable "optimize_aggregation_in_order":
(fast but potentially dangerous?)
On the one hand when "optimize_aggregation_in_order" is disabled and you simulate the creation of the above MTW and look at its execution plan you'll see something like this...
EXPLAIN PIPELINE
SELECT *
FROM
(
SELECT
num_hashed_as_hex,
avgState(some_other_col1),
sumState(some_other_col2)
FROM test_tbl
GROUP BY num_hashed_as_hex
)
LIMIT 10
Query id: c9475022-a2d9-4d32-bcfc-701562ed5edc
┌─explain──────────────────────────────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Limit) │
│ Limit │
│ (Aggregating) │
│ AggregatingTransform │
│ (Expression) │
│ ExpressionTransform │
│ (ReadFromMergeTree) │
│ MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 │
└──────────────────────────────────────────────────────────────────────────┘
... and if you try to execute the query you'll see that temporary-files are being created in the temporary directory and you'll have to wait for a very long time to get the results.
On the other hand if you activate "optimize_aggregation_in_order", disconnect&riconnect, and look at the execution plan you'll see this:
EXPLAIN PIPELINE
SELECT *
FROM
(
SELECT
num_hashed_as_hex,
avgState(some_other_col1),
sumState(some_other_col2)
FROM test_tbl
GROUP BY num_hashed_as_hex
)
LIMIT 10
Query id: 8e653b68-a11c-4423-92d9-05ad7fbb1d10
┌─explain────────────────────────────────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Limit) │
│ Limit │
│ (Aggregating) │
│ FinalizeAggregatedTransform │
│ AggregatingInOrderTransform │
│ (Expression) │
│ ExpressionTransform │
│ (ReadFromMergeTree) │
│ MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 │
└────────────────────────────────────────────────────────────────────────────┘
If you now execute the SQL you will immediately get the result, because CH knows that the records in the base table are already ordered by "num_hashed_as_hex" (the "...InOrder..." mentioned in the exec plan) and will therefore leverage that, without having to first scan all the data & creating temp records & having to merge them, and therefore without almost using any RAM at all for temp operations.
Populating the MTW becomes basically a "streaming" operation, in my case CH was using all the time just ~1.5GiB RAM.
IMPORTANT-1:
For this to work the MTW's "group by" condition must use a column that is used as well by its base table to sort its records => if it is then the MTW starts getting from its base table the aggregated records almost immediately and is populated without using any RAM (pls. monitor "system.merges" for the ongoing merges of the MTW's parts).
IMPORTANT-2:
I mention "disconnect&reconnect" because in my case when modifying the setting the execution plan changes but if I don't first disconnect the client then the SQL is apparently still executed "the old way", I don't know why.
WARNING:
"optimize_aggregation_in_order" seems to have some bugs (which is the reason why this setting isn't currently turned on by default). I found one in Github which mentioned that the MTW wasn't updated when records were written against its partitioned base table.
I therefore recommend to use it only when the MTW is populated at creation time and while no write-ops are being executed against its base table.
Hope this helps... :)