Search code examples
clickhouse

Populating a Materialized View in ClickHouse exceeds Memory limit


I'm trying to create a materialized view using the ReplicatedAggregatingMergeTree engine on a table that uses a ReplicatedMergeTree engine.

After a few million rows I get DB::Exception: Memory limit (for query) exceeded. Is there way to work around this?

CREATE MATERIALIZED VIEW IF NOT EXISTS shared.aggregated_calls_1h
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/shared/aggregated_calls_1h', '{replica}')
PARTITION BY toRelativeDayNum(retained_until_date)
ORDER BY (
  client_id,
  t,
  is_synthetic,
    source_application_ids,
    source_service_id,
    source_endpoint_id,
    destination_application_ids,
    destination_service_id,
    destination_endpoint_id,
    boundary_application_ids,
    process_snapshot_id,
    docker_snapshot_id,
    host_snapshot_id,
    cluster_snapshot_id,
    http_status
)
SETTINGS index_granularity = 8192
POPULATE
AS
SELECT
  client_id,
  toUInt64(floor(t / (60000 * 60)) * (60000 *60)) AS t,
  date,
  toDate(retained_until_timestamp / 1000) retained_until_date,
  is_synthetic,
    source_application_ids,
    source_service_id,
    source_endpoint_id,
  destination_application_ids,
    destination_service_id,
  destination_endpoint_id,
    boundary_application_ids,
    http_status,
    process_snapshot_id,
    docker_snapshot_id,
    host_snapshot_id,
    cluster_snapshot_id,
  any(destination_endpoint) AS destination_endpoint,
  any(destination_endpoint_type) AS destination_endpoint_type,
  groupUniqArrayArrayState(destination_technologies) AS destination_technologies_state,
  minState(ingestion_time) AS min_ingestion_time_state,
  sumState(batchCount) AS sum_call_count_state,
  sumState(errorCount) AS sum_error_count_state,
  sumState(duration) AS sum_duration_state,
  minState(toUInt64(ceil(duration/batchCount))) AS min_duration_state,
  maxState(toUInt64(ceil(duration/batchCount))) AS max_duration_state,
    quantileTimingWeightedState(0.25)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p25_state,
    quantileTimingWeightedState(0.50)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p50_state,
    quantileTimingWeightedState(0.75)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p75_state,
    quantileTimingWeightedState(0.90)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p90_state,
    quantileTimingWeightedState(0.95)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p95_state,
    quantileTimingWeightedState(0.98)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p98_state,
    quantileTimingWeightedState(0.99)(toUInt64(ceil(duration/batchCount)), batchCount) AS latency_p99_state,
    quantileTimingWeightedState(0.25)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p25_large_state,
    quantileTimingWeightedState(0.50)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p50_large_state,
    quantileTimingWeightedState(0.75)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p75_large_state,
    quantileTimingWeightedState(0.90)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p90_large_state,
    quantileTimingWeightedState(0.95)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p95_large_state,
    quantileTimingWeightedState(0.98)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p98_large_state,
    quantileTimingWeightedState(0.99)(toUInt64(ceil(duration/batchCount)/100), batchCount) AS latency_p99_large_state,
    sumState(minSelfTime) AS sum_min_self_time_state
FROM shared.calls_v2
WHERE sample_type != 'user_selected'
GROUP BY
  client_id,
    t,
    date,
    retained_until_date,
    is_synthetic,
    source_application_ids,
    source_service_id,
    source_endpoint_id,
    destination_application_ids,
    destination_service_id,
    destination_endpoint_id,
    boundary_application_ids,
    process_snapshot_id,
    docker_snapshot_id,
    host_snapshot_id,
    cluster_snapshot_id,
    http_status
HAVING destination_endpoint_type != 'INTERNAL'

Solution

  • I use the Clickhouse database ("CH") since a long time (at least ~2019).
    The problems of not having enough memory/RAM in relation to group/sort-by clauses can happen quite frequently in CH, which is often unexpected as other DBs (especially "normal" RDBMS like e.g. Postgres/Oracle/MariaDB/DB2/etc...) would mostly handle this by just using temporary data written&read to&from disk.
    I came across this issue in CH again last week when trying to build & populating with data a materialized view ("MTW") against a set of data of ~9 billion rows (a 10% random selection of the underlying table hosting ~89 billion rows).
    Here are the takeaways of my investigations.

    Summary:

    (assumption: you use a dedicated user to build your MTW, therefore the settings that I mention would be changed only for that user, therefore not impacting other users)

    • config setting "optimize_aggregation_in_order"

      1. Stop all "write"-activities happening against the base table.
        (reason: potential bugs)
      2. Activate "optimize_aggregation_in_order" by setting it to "1".
      3. Disconnect&reconnect.
      4. Build your MTW.
      5. Disable "optimize_aggregation_in_order" by setting it to "0".
      6. Disconnect&reconnect.
    • config setting "max_threads"

      1. Stop all write&read activities for the user being used to create the MTW.
      2. Set "max_threads" to just "1".
      3. Create the MTW.
      4. Set "max_threads" to its original value.
    • upgrade your CH software to v23.10.3.5 or higher.

    Details:

    I had again this problem a few weeks ago while using CH v22.11.2.30.
    This time I decided to dig a bit deeper into it.

    1) CH v22.11.2.3 vs. v23.10.3.5 :

    Apparently some ~big changes were integrated between these two versions as while using v22.11.2.3 I prepared a testcase but that did not work anymore after upgrading to v23.10.3.5 so I had to prepare a new one.

    I noticed as well that v22 was almost always religiously sticking to the limit set by the config value "max_memory_usage", v23 seems to be a lot more relaxed in that regard.

    In v22 the files created in the temporary-files-directory (cfg setting "tmp_path") kept getting smaller and smaller the longer the query ran (then in my case everything crashed, leaving me with 4.5 million files in that dir => deleting them with the "find"-command worked, but even later the ZFS-filesystem was still consistently generating a lot of I/O when just trying to do an "ls" of the contents of that (now) empty directory => only deleting that temporary directory and recreating it fixed the problem => interesting), in v23 this problem seems to be fixed.

    2) Testcase:

    Simplified example (created by using v23.10.3.5):
    2a) Create the base table:
    create table test_tbl
    (
    num_hashed_as_hex String CODEC(ZSTD(1))
    ,some_other_col1 UInt32 CODEC(ZSTD(1))
    ,some_other_col2 UInt32 CODEC(ZSTD(1))
    ,some_other_col3 UInt32 CODEC(ZSTD(1))
    )
    engine=MergeTree()
    primary key (num_hashed_as_hex)
    order by (num_hashed_as_hex)
    settings index_granularity=8192,min_merge_bytes_to_use_direct_io=999999999999999999
    

    2b) Populate it with 5B rows (500M unique "num_hashed_as_hex", each repeated 10 times):

    insert into test_tbl
    select num_hash_string, num_rand_uint32, num_rand_uint32, num_rand_uint32
    from
    (
      select hex(murmurHash2_64(num2hash.number)) num_hash_string, rand32() num_rand_uint32
      FROM numbers(0,500000000) as num2hash, numbers(0,10) repeat_each_hash
    )
    

    2c) Wait until 2b has finished, AND that all running merges have finished too (monitor the activities in "system.merges").

    2d) Simplify the testcase by merging all parts into a single one:

    optimize table test_tbl final
    

    2e) Try to create the materialized view, populating it as well with data (clause "POPULATE" in the below SQL):

    CREATE MATERIALIZED VIEW test_mvw_2
    (
    num_hashed_as_hex String CODEC(ZSTD(1))
    ,my_avg AggregateFunction(avg, UInt32) CODEC(ZSTD(1))
    ,my_sum AggregateFunction(sum, UInt32) CODEC(ZSTD(1))
    )
    ENGINE = AggregatingMergeTree()
    ORDER BY (num_hashed_as_hex)
    settings index_granularity=8192,min_merge_bytes_to_use_direct_io=999999999999999999
    POPULATE
    AS
    select
    num_hashed_as_hex
    ,avgState(some_other_col1)
    ,sumState(some_other_col2)
    from test_tbl
    group by num_hashed_as_hex
    

    3) How to make #2e work:

    3a) Use only 1 thread:
    (safest but slowest option)
    By running a VM that had a total of only 20GiB available and the CH user's config set as follows...

    <max_memory_usage>3000000000</max_memory_usage>
    <max_bytes_before_external_sort>100000000</max_bytes_before_external_sort>
    <max_bytes_before_external_group_by>1000000000</max_bytes_before_external_group_by>
    <max_threads>1</max_threads>
    

    ...the MTW is created successfully when using only 1 thread.
    While the MTW is being populated please keep an eye on the contents of your temp-directory (setting "tmp_path").
    As soon as I change "max_threads" to anything higher than "1" then I start getting the usual error message:

    Received exception from server (version 23.10.3): Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 2.84 GiB (attempt to allocate chunk of 134217728 bytes), maximum: 2.79 GiB.. (MEMORY_LIMIT_EXCEEDED)

    To be exact the execution is successful while creating the temporary files in the temp-directory, but fails when CH tries to merge them and insert the data into the MTW.
    This happens as well when I e.g. double the VM's RAM to 40GiB and increase a lot the limit of "max_memory_usage". No clue why the increase of RAM usage is not linear with the amount of threads.

    3b) Temporarily enable "optimize_aggregation_in_order":
    (fast but potentially dangerous?)
    On the one hand when "optimize_aggregation_in_order" is disabled and you simulate the creation of the above MTW and look at its execution plan you'll see something like this...

    EXPLAIN PIPELINE
    SELECT *
    FROM
    (
        SELECT
            num_hashed_as_hex,
            avgState(some_other_col1),
            sumState(some_other_col2)
        FROM test_tbl
        GROUP BY num_hashed_as_hex
    )
    LIMIT 10
    
    Query id: c9475022-a2d9-4d32-bcfc-701562ed5edc
    
    ┌─explain──────────────────────────────────────────────────────────────────┐
    │ (Expression)                                                             │
    │ ExpressionTransform                                                      │
    │   (Limit)                                                                │
    │   Limit                                                                  │
    │     (Aggregating)                                                        │
    │     AggregatingTransform                                                 │
    │       (Expression)                                                       │
    │       ExpressionTransform                                                │
    │         (ReadFromMergeTree)                                              │
    │         MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 │
    └──────────────────────────────────────────────────────────────────────────┘
    

    ... and if you try to execute the query you'll see that temporary-files are being created in the temporary directory and you'll have to wait for a very long time to get the results.
    On the other hand if you activate "optimize_aggregation_in_order", disconnect&riconnect, and look at the execution plan you'll see this:

    EXPLAIN PIPELINE
    SELECT *
    FROM
    (
        SELECT
            num_hashed_as_hex,
            avgState(some_other_col1),
            sumState(some_other_col2)
        FROM test_tbl
        GROUP BY num_hashed_as_hex
    )
    LIMIT 10
    
    Query id: 8e653b68-a11c-4423-92d9-05ad7fbb1d10
    
    ┌─explain────────────────────────────────────────────────────────────────────┐
    │ (Expression)                                                               │
    │ ExpressionTransform                                                        │
    │   (Limit)                                                                  │
    │   Limit                                                                    │
    │     (Aggregating)                                                          │
    │     FinalizeAggregatedTransform                                            │
    │       AggregatingInOrderTransform                                          │
    │         (Expression)                                                       │
    │         ExpressionTransform                                                │
    │           (ReadFromMergeTree)                                              │
    │           MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 │
    └────────────────────────────────────────────────────────────────────────────┘
    

    If you now execute the SQL you will immediately get the result, because CH knows that the records in the base table are already ordered by "num_hashed_as_hex" (the "...InOrder..." mentioned in the exec plan) and will therefore leverage that, without having to first scan all the data & creating temp records & having to merge them, and therefore without almost using any RAM at all for temp operations.
    Populating the MTW becomes basically a "streaming" operation, in my case CH was using all the time just ~1.5GiB RAM.

    IMPORTANT-1:
    For this to work the MTW's "group by" condition must use a column that is used as well by its base table to sort its records => if it is then the MTW starts getting from its base table the aggregated records almost immediately and is populated without using any RAM (pls. monitor "system.merges" for the ongoing merges of the MTW's parts).

    IMPORTANT-2:
    I mention "disconnect&reconnect" because in my case when modifying the setting the execution plan changes but if I don't first disconnect the client then the SQL is apparently still executed "the old way", I don't know why.

    WARNING:
    "optimize_aggregation_in_order" seems to have some bugs (which is the reason why this setting isn't currently turned on by default). I found one in Github which mentioned that the MTW wasn't updated when records were written against its partitioned base table.
    I therefore recommend to use it only when the MTW is populated at creation time and while no write-ops are being executed against its base table.

    Hope this helps... :)