Search code examples
concurrencydatabricksdelta-lakeid-generation

Concurrency issue in Databricks Delta lake


I have an audit table in Databricks Delta Lake with four fields: id, task_name, start_time, and end_time. The purpose of this table is to capture the start and end times of each job. However, I am currently facing concurrency issues when running five notebooks in parallel, resulting in conflicts during insertion and updating. To address the update concurrency problem, I have partitioned the audit table based on the task_name field and yet to test it. I am now encountering difficulties with concurrent row insertion. I am seeking a concurrent-safe logic for generating ID values without relying on the Delta table's identity property, as it presents issues in the Delta table. I would greatly appreciate any suggestions you can provide.


Solution

  • Even if you do the partitioning, you still need to have a condition on the specific partition value, not only on source.partition = dest.partition - it should be source.partition = dest.partition AND dest.partition = 'job_name' . That's is demonstrated int he delta lake documentation. But this will generate quite many partitions with small files that will harm the performance when you access your data.

    But you can avoid conflicts in the delta table if you switch to the append-only solution, where you will append starts & stops as individual rows, and then have a view on top of that table to find the latest status. Something like this:

    • Create and fill the table with sample data:
    create table if not exists process_statuses (
      run_id long,
      process string,
      status string,
      timestamp timestamp
    );
    
    insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'started', current_timestamp());
    insert into process_statuses(run_id, process, status, timestamp) values(2, 'test2', 'started', current_timestamp());
    insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'finished', current_timestamp());
    
    • Create a view to get latest status of all jobs or specific job:
    create or replace view latest_process_status as (
      with cte as (
         select *, 
             (row_number() OVER (PARTITION BY run_id ORDER BY timestamp desc)) AS rn 
           from process_statuses)
      select * except(rn) from cte where rn = 1
    )