I'm trying to use a PostgreSQL database as a simple job queue for running some simulations. It's been a while since I've written SQL, and despite my best efforts (using ChatGPT, googling the myriad error messages, reading the documentation), I've been unable to figure this one out.
When running this function, I keep getting Array value must start with "{" or dimension information. malformed array literal: "1"
I think that's because when it returns, for some reason it's returning just the job_id
column.
I'm using the following schema:
CREATE TABLE runner (
id integer NOT NULL,
created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
last_seen_at timestamp with time zone,
alias character varying(255) NOT NULL,
hostname character varying(255) NOT NULL
);
CREATE SEQUENCE runner_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE runner_id_seq OWNED BY runner.id;
ALTER TABLE ONLY runner ALTER COLUMN id SET DEFAULT nextval('runner_id_seq'::regclass);
ALTER TABLE ONLY runner ADD CONSTRAINT runner_pkey PRIMARY KEY (id);
CREATE TYPE public.job_status AS ENUM (
'pending',
'failed',
'complete',
'running'
);
CREATE TABLE job (
id integer NOT NULL,
created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
completed_at timestamp with time zone,
status public.job_status DEFAULT 'pending'::public.job_status NOT NULL,
specification jsonb NOT NULL,
upstream_manifest integer,
completed_by integer,
attempted_by integer
);
CREATE SEQUENCE job_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE job_id_seq OWNED BY job.id;
ALTER TABLE ONLY job ALTER COLUMN id SET DEFAULT nextval('job_id_seq'::regclass);
ALTER TABLE ONLY job
ADD CONSTRAINT job_pkey PRIMARY KEY (id);
ALTER TABLE ONLY job
ADD CONSTRAINT job_attempted_by_fkey FOREIGN KEY (attempted_by) REFERENCES runner(id);
ALTER TABLE ONLY job
ADD CONSTRAINT job_completed_by_fkey FOREIGN KEY (completed_by) REFERENCES runner(id);
INSERT INTO runner (alias, hostname) VALUES ('test01', 'test-host');
INSERT INTO job (specification) VALUES ('{"spec": true}'::jsonb);
-- 1. Get runner ID by alias
-- 2. Update runner last seen time (trigger?)
-- 3. Select 'pending' total_jobs limit total_jobs
-- 4. Set the foreign key of the runner who's attempting this job
CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
runner_id integer;
selected_jobs job[];
BEGIN
-- Start a transaction
BEGIN
-- Retrieve the runner_id based on the runner_alias
SELECT id INTO runner_id FROM runner WHERE alias = runner_alias;
-- Select the first total_jobs pending jobs to assign them to the runner
SELECT j.id as job_id, j.specification
FROM job AS j
WHERE status = 'pending'
LIMIT total_jobs
FOR UPDATE SKIP LOCKED
INTO selected_jobs;
-- Update the selected jobs to set their status to 'running' and assign them to the runner
UPDATE job
SET attempted_by = runner_id,
status = 'running'
WHERE id IN (SELECT job_id FROM selected_jobs);
-- Return the selected jobs
RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
END;
END;
$$ LANGUAGE plpgsql;
I've tried rewriting it with a FOR loop (RETURN NEXT), using RETURNS SETOF job, and various other tweaks, but so far, I haven't been able to crack it.
I ended up fixing it by using a temporary table approach:
CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
runner_id integer;
BEGIN
-- Start a transaction
BEGIN
-- Retrieve the runner_id based on the runner_alias
SELECT id INTO runner_id FROM api.runner WHERE alias = runner_alias;
-- Create a temporary table to insert the data into
CREATE TEMPORARY TABLE selected_jobs (job_id integer, specification json)
ON COMMIT DROP;
-- Select the first total_jobs pending jobs to assign them to the runner
INSERT INTO selected_jobs(job_id, specification)
SELECT j.id, j.specification
FROM api.job AS j
WHERE status = 'pending'
LIMIT total_jobs
FOR UPDATE SKIP LOCKED;
-- Update the selected jobs to set their status to 'running' and assign them to the runner
UPDATE api.job j
SET attempted_by = runner_id,
status = 'running'
WHERE j.id IN (SELECT s.job_id FROM selected_jobs s);
-- Return the selected jobs
RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
END;
END;
$$ LANGUAGE plpgsql;