Search code examples
postgresqlselect-for-update

How to select for update one row from table A and all joined rows from table B in Postgres?


I have a table that is a queue of tasks with each task requiring exclusive access to several resources. I want my query to select a single task that doesn't need resources claimed by other similar sessions.

If each task had to work on a single resource I would've written something like this:

select *
from tasks t
inner join resources r
on r.id = t.resource_id
order by t.submitted_ts
limit 1
for update skip locked

But since I have multiple resources I somehow have to lock them all:

select *
from tasks t
inner join task_details td
on t.id = td.task_id
inner join resources r
on r.id = td.resource_id
order by t.submitted_ts, t.id
limit ???
for update skip locked

I cannot limit by 1, since I need to lock all joined rows of resources.

It also seems to me that I should try and lock all rows of resources, so it must be not skip locked, but nowait for resources and skip locked for tasks.


Solution

  • First I had to create a helper function that either locks all linked rows or not:

    create or replace function try_lock_resources(p_task_id bigint)
    returns boolean
    language plpgsql
    as $$
    begin
        perform *
        from task_details td
        join resources r
        on td.resource_id = r.resource_id
        where td.task_id = p_task_id 
        for update of r nowait;
    
        return true;
    exception when lock_not_available then
        return false;
    end;
    $$;
    

    Then I needed to invoke this function for each row:

    select *
    from tasks
    where processing_status = 'Unprocessed'
    and try_lock_resources(task_id)
    order by created_ts
    limit 1
    for update skip locked
    

    After this query is run, only the returned row and its associated resources are locked. I verified that an identical query from another session returns the first unprocessed tasks that has no resources in common with the one returned by the first session.

    P.S.: the original answer used a different query (which you shouldn't use as is):

    with unprocessed_tasks as materialized (
        select * 
        from tasks t
        where processing_status = 'Unprocessed'
        order by created_ts
    )
    select *
    from unprocessed_tasks
    where try_lock_resources(task_id)
    limit 1
    for update skip locked
    

    The problem with this query is that the following could (and did happen):

    • session A runs the query, locks task X and starts working on it
    • session B starts running the query, the materialized CTE is run first, returning task X among other tasks
    • session A commits the transaction and releases all locks
    • session B finishes running the query, locks task X and starts working on it