Search code examples
postgresqlstored-proceduresplpgsql

How to return temporary table from a PostgreSQL function?


I'm trying to use a PostgreSQL database as a simple job queue for running some simulations. It's been a while since I've written SQL, and despite my best efforts (using ChatGPT, googling the myriad error messages, reading the documentation), I've been unable to figure this one out.

When running this function, I keep getting Array value must start with "{" or dimension information. malformed array literal: "1"

I think that's because when it returns, for some reason it's returning just the job_id column.

I'm using the following schema:

runner

CREATE TABLE runner (
    id integer NOT NULL,
    created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
    last_seen_at timestamp with time zone,
    alias character varying(255) NOT NULL,
    hostname character varying(255) NOT NULL
);

CREATE SEQUENCE runner_id_seq
    AS integer
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;
ALTER SEQUENCE runner_id_seq OWNED BY runner.id;
ALTER TABLE ONLY runner ALTER COLUMN id SET DEFAULT nextval('runner_id_seq'::regclass);
ALTER TABLE ONLY runner ADD CONSTRAINT runner_pkey PRIMARY KEY (id);

job

CREATE TYPE public.job_status AS ENUM (
    'pending',
    'failed',
    'complete',
    'running'
);
CREATE TABLE job (
    id integer NOT NULL,
    created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
    completed_at timestamp with time zone,
    status public.job_status DEFAULT 'pending'::public.job_status NOT NULL,
    specification jsonb NOT NULL,
    upstream_manifest integer,
    completed_by integer,
    attempted_by integer
);

CREATE SEQUENCE job_id_seq
    AS integer
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;

ALTER SEQUENCE job_id_seq OWNED BY job.id;
ALTER TABLE ONLY job ALTER COLUMN id SET DEFAULT nextval('job_id_seq'::regclass);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_pkey PRIMARY KEY (id);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_attempted_by_fkey FOREIGN KEY (attempted_by) REFERENCES runner(id);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_completed_by_fkey FOREIGN KEY (completed_by) REFERENCES runner(id);

Seeding the tables

INSERT INTO runner (alias, hostname) VALUES ('test01', 'test-host');

INSERT INTO job (specification) VALUES ('{"spec": true}'::jsonb);

The request_jobs function

-- 1. Get runner ID by alias
-- 2. Update runner last seen time (trigger?)
-- 3. Select 'pending' total_jobs limit total_jobs
-- 4. Set the foreign key of the runner who's attempting this job
CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
    runner_id integer;
    selected_jobs job[];
BEGIN
    -- Start a transaction
    BEGIN
        -- Retrieve the runner_id based on the runner_alias
        SELECT id INTO runner_id FROM runner WHERE alias = runner_alias;

        -- Select the first total_jobs pending jobs to assign them to the runner
        SELECT j.id as job_id, j.specification
        FROM job AS j
        WHERE status = 'pending'
        LIMIT total_jobs
        FOR UPDATE SKIP LOCKED
        INTO selected_jobs;

        -- Update the selected jobs to set their status to 'running' and assign them to the runner
        UPDATE job
        SET attempted_by = runner_id,
            status = 'running'
        WHERE id IN (SELECT job_id FROM selected_jobs);

        -- Return the selected jobs
        RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
    END;

END;
$$ LANGUAGE plpgsql;

I've tried rewriting it with a FOR loop (RETURN NEXT), using RETURNS SETOF job, and various other tweaks, but so far, I haven't been able to crack it.


Solution

  • I ended up fixing it by using a temporary table approach:

    CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
    RETURNS TABLE(job_id integer, specification json) AS $$
    DECLARE
        runner_id integer;
    BEGIN
        -- Start a transaction
        BEGIN
            -- Retrieve the runner_id based on the runner_alias
            SELECT id INTO runner_id FROM api.runner WHERE alias = runner_alias;
    
            -- Create a temporary table to insert the data into
            
            CREATE TEMPORARY TABLE selected_jobs (job_id integer, specification json)
            ON COMMIT DROP;
            
            -- Select the first total_jobs pending jobs to assign them to the runner
            INSERT INTO selected_jobs(job_id, specification)
            SELECT j.id, j.specification
            FROM api.job AS j
            WHERE status = 'pending'
            LIMIT total_jobs
            FOR UPDATE SKIP LOCKED;
    
            -- Update the selected jobs to set their status to 'running' and assign them to the runner
            UPDATE api.job j
            SET attempted_by = runner_id,
                status = 'running'
            WHERE j.id IN (SELECT s.job_id FROM selected_jobs s);
    
            -- Return the selected jobs
            RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
        END;
    
    END;
    $$ LANGUAGE plpgsql;