Search code examples
apache-sparkdatabricksdelta-lake

Specify retention of 'x versions' of a delta table?


VACUUM will remove Delta table data files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. That retention threshold can be specified explicitly during the VACUUM operation VACUUM table_name [RETAIN num HOURS] or implicitly via the table property delta.deletedFileRetentionDuration = 'interval X days'. However, it appears to only be possible to specify that threshold in time. I'd like to guarantee retention of at least x versions of a delta table. Is there a workaround to enable this? Perhaps without synchronizing VACUUM and write operations? I'd like to VACUUM the table on a schedule. But the table can be written to at random times. Say I have delta.deletedFileRetentionDuration = 'interval 7 days', the table has not been written to for over a week, and then a scheduled background process runs VACUUM; then only the latest version of the table will remain, right? Behavior like this would not be desired in my case. We'd at the very least like to guarantee at least 2 versions.


Solution

  • You can calculate the necessary retention hours for the last X versions by examining the Delta log and then use it in the VACUUM command. Below is a PySpark version to calculate it.

    from delta import DeltaTable
    from pyspark.sql import functions as F, types as T
    
    path = 'path_to_your_table'
    delta_table = DeltaTable.forPath(spark, path)
    history = delta_table.history(10)  # set the number of required versions here
    
    diff_to_now = F.current_timestamp() - F.col('timestamp')
    necessary_retention = F.ceil(diff_to_now.cast(T.LongType()) / F.lit(3600))
    cols = [
        F.col('version').alias('version'),
        F.col('timestamp').alias('timestamp'),
        diff_to_now.alias('diff_to_now'),
        necessary_retention.alias('necessary_retention_hours')
    ]
    history_2 = history.select(cols)
    history_2.show(10, False)
    
    # +-------+-------------------+------------------------------------------+-------------------------+
    # |version|timestamp          |diff_to_now                               |necessary_retention_hours|
    # +-------+-------------------+------------------------------------------+-------------------------+
    # |578    |2024-02-08 10:16:58|INTERVAL '0 00:02:49.869462' DAY TO SECOND|1                        |
    # |577    |2024-02-08 10:10:30|INTERVAL '0 00:09:17.869462' DAY TO SECOND|1                        |
    # |576    |2024-02-08 09:44:03|INTERVAL '0 00:35:44.869462' DAY TO SECOND|1                        |
    # |575    |2024-02-08 09:36:48|INTERVAL '0 00:42:59.869462' DAY TO SECOND|1                        |
    # |574    |2024-02-08 09:29:20|INTERVAL '0 00:50:27.869462' DAY TO SECOND|1                        |
    # |573    |2024-02-08 09:22:52|INTERVAL '0 00:56:55.869462' DAY TO SECOND|1                        |
    # |572    |2024-02-08 09:17:31|INTERVAL '0 01:02:16.869462' DAY TO SECOND|2                        |
    # |571    |2024-02-08 09:05:52|INTERVAL '0 01:13:55.869462' DAY TO SECOND|2                        |
    # |570    |2024-02-08 08:56:58|INTERVAL '0 01:22:49.869462' DAY TO SECOND|2                        |
    # |569    |2024-02-08 08:49:09|INTERVAL '0 01:30:38.869462' DAY TO SECOND|2                        |
    # +-------+-------------------+------------------------------------------+-------------------------+
    
    retention_df = history_2.select(
        F.min_by('necessary_retention_hours', 'version').alias('retention')
    )
    retention = retention_df.collect()[0]['retention']
    print(retention)
    # >>> 2