Search code examples
amazon-web-servicestriggersamazon-rdspg-cron

Adding a custom trigger to Postgres pg_cron's job_run_details table on RDS


I'm using pg_cron, and would like to add a trigger to copy failed results into a custom system_message table. I've got this mocked up locally, but it's not working on RDS. And, there are no errors to see. Before digging into this more, and committing to this strategy, is it even a good or safe idea to add a custom trigger on top of pg_cron?

Another alternative is to write a standard function to copy the data, run that in a pg_cron task, and stash the max runid to use as a "find greater than this" search on the next copy. However, given the vagaries of concurrent systems, I might well end up missing failed jobs this way. Which is why I thought of the row trigger in the first place.

Thanks for any advice or alternative ideas.

Under "efforts so far":

GRANT TRIGGER on target table

GRANT TRIGGER on cron.job_run_details TO rds_super;

Define trigger function

CREATE OR REPLACE FUNCTION dba.trigger_function_job_run_details_after_insert()
RETURNS trigger AS

$BODY$

DECLARE
   result_v int4 = 0;

BEGIN
   
   IF NEW.status = 'failed' THEN
        SELECT dba.system_message_add (
             'Error',
             'pg_cron job failed',
              to_jsonb(NEW)
            ) INTO result_v; -- Could alternatively use PERFORM and ignore the result....we don't care about it anyway.
   END IF;
   RETURN NEW;
   
END

$BODY$

  LANGUAGE plpgsql 
  VOLATILE;

COMMENT ON FUNCTION dba.trigger_function_job_run_details_after_insert() IS 
'Post an error to system_message when a pg_cron job fails.';

ALTER FUNCTION dba.trigger_function_job_run_details_after_insert
    OWNER TO user_bender;

Define trigger binding

-- Could use a STATEMENT trigger here...figured I'd leave it more obvious what's happening.
CREATE OR REPLACE TRIGGER trigger_job_run_details_after_insert
  AFTER INSERT
            ON cron.job_run_details
           FOR EACH ROW
       EXECUTE PROCEDURE dba.trigger_function_job_run_details_after_insert();

system_message_kind domain

------------------------
-- system_message_kind
------------------------
DROP DOMAIN IF EXISTS domains.system_message_kind;

CREATE DOMAIN domains.system_message_kind AS
    citext
    NOT NULL
    CONSTRAINT system_message_kind_legal_values
        CHECK(
            VALUE IN (
            'Info','Advice','Notice','Error')
     );


COMMENT ON DOMAIN domains.system_message_kind IS
    'Constrains system_message.kind field and parameter values.';

system_message table

BEGIN;

DROP TABLE IF EXISTS dba.system_message;

CREATE TABLE IF NOT EXISTS dba.system_message (

    id                      int8                  GENERATED ALWAYS AS IDENTITY,

    created_dts             timestamp             NOT NULL DEFAULT NOW(),
    
                                                  -- Tip: Giving the CHECK a specific name improves error results, you'll see database_name_is_known in the message.
    database_name           citext                NOT NULL DEFAULT  current_database()     
                                                        CONSTRAINT  database_name_is_known -- Name shows up in errors.
                                                             CHECK (database_name IN ('postgres','nautilus','squid')),

    kind                     system_message_kind  NOT NULL DEFAULT NULL, -- 'Info','Advice','Notice','Error' in custom 'domain'

    subject                  citext               NOT NULL DEFAULT NULL,  -- Makes no sense to have an empty message.
    payload_text             citext               NOT NULL DEFAULT '',    -- You can include text, json, neither, or both. So far.  
    payload_json             jsonb                NOT NULL DEFAULT '{}'          

);

ALTER TABLE dba.system_message
    OWNER TO user_change_structure;

------------------------------------
-- FILLFACTOR
------------------------------------
-- This is a high-thrash table, by nature. It's a queue with short-lived rows.
ALTER TABLE dba.system_message
       SET (FILLFACTOR = 85);

system_message_add function

/*

kind:    
required 
DOMAIN used to automate type-check on parameter, and constrain values to legal kinds.

subject:
required
Anything you like

payload: 
Optional
text, json, text & json, or neither


Simple script to exercise the different parameter lists:

truncate table system_message;

select * from system_message_add('info','subject only');
select * from system_message_add('info','subject and text','Payload text');
select * from system_message_add('info','subject and json','{ "json": "payload only"}'::jsonb);
select * from system_message_add('info','subject text and json','Payload text','{ "json": "text and jsonb payloads"}'::jsonb);

select * from system_message;

+----+----------------------------+---------------+------+-----------------------+--------------+-------------------------------------+
| id | created_dts                | database_name | kind | subject               | payload_text | payload_json                        |
+----+----------------------------+---------------+------+-----------------------+--------------+-------------------------------------+
| 18 | 2024-02-19 08:31:08.782403 | squid         | info | subject only          |              | {}                                  |
| 19 | 2024-02-19 08:31:08.797859 | squid         | info | subject and text      | Payload text | {}                                  |
| 20 | 2024-02-19 08:31:08.803001 | squid         | info | subject and json      |              | {"json": "payload only"}            |
| 21 | 2024-02-19 08:31:08.815063 | squid         | info | subject text and json | Payload text | {"json": "text and jsonb payloads"} |
+----+----------------------------+---------------+------+-----------------------+--------------+-------------------------------------+

Note: Given the parameter options and how data is passed, you'll end up with your JSON in the text field, if you don't cast it to JSONB.
I'm anticipating that we'll be passing through existing JSON results, in the real world. Time will tell.
*/


------------------------------------------------------------------
-- system_message_add (kind, subject)
------------------------------------------------------------------
-- Weird one, only useful for a basic signal...not sure that we want this.
CREATE OR REPLACE FUNCTION dba.system_message_add (
      kind_in           system_message_kind,
      subject_in        citext)
      
 RETURNS int4 AS
 
$BODY$
 
 INSERT INTO dba.system_message
            (kind,    subject)
     VALUES (kind_in, subject_in)
     
   RETURNING 1;     
 
$BODY$
LANGUAGE SQL;

COMMENT ON FUNCTION dba.system_message_add (system_message_kind, citext) IS 
'Add a system message.';

ALTER FUNCTION dba.system_message_add (system_message_kind, citext)
    OWNER TO user_bender;

------------------------------------------------------------------
-- system_message_add (kind, subject, payload text)
------------------------------------------------------------------
CREATE OR REPLACE FUNCTION dba.system_message_add (
      kind_in           system_message_kind,
      subject_in        citext,
      payload_text_in   citext)
      
 RETURNS int4 AS
 
$BODY$
 
 INSERT INTO dba.system_message
            (kind,    subject,    payload_text)
     VALUES (kind_in, subject_in, payload_text_in)
     
   RETURNING 1;     
 
$BODY$
LANGUAGE SQL;

COMMENT ON FUNCTION dba.system_message_add (system_message_kind, citext, citext) IS 
'Add a system message.';

ALTER FUNCTION dba.system_message_add (system_message_kind, citext, citext)
    OWNER TO user_bender;

------------------------------------------------------------------
-- system_message_add (kind, subject, payload json)
------------------------------------------------------------------
CREATE OR REPLACE FUNCTION dba.system_message_add (
      kind_in           system_message_kind,
      subject_in        citext,
      payload_json_in   jsonb)
      
 RETURNS int4 AS
 
$BODY$
 
 INSERT INTO dba.system_message
            (kind,    subject,    payload_json)
     VALUES (kind_in, subject_in, payload_json_in)
     
   RETURNING 1;     
 
$BODY$
LANGUAGE SQL;

COMMENT ON FUNCTION dba.system_message_add (system_message_kind, citext, jsonb) IS 
'Add a system message.';

ALTER FUNCTION dba.system_message_add (system_message_kind, citext, jsonb)
    OWNER TO user_bender;

------------------------------------------------------------------
-- system_message_add (kind, subject, payload_text, payload json)
------------------------------------------------------------------
CREATE OR REPLACE FUNCTION dba.system_message_add (
      kind_in           system_message_kind,
      subject_in        citext,
      payload_text_in   citext,
      payload_json_in   jsonb)
      
 RETURNS int4 AS
 
$BODY$
 
 INSERT INTO dba.system_message
            (kind,    subject,    payload_text,    payload_json)
     VALUES (kind_in, subject_in, payload_text_in, payload_json_in)
     
   RETURNING 1;     
 
$BODY$
LANGUAGE SQL;

COMMENT ON FUNCTION dba.system_message_add (system_message_kind, citext, citext, jsonb) IS 
'Add a system message.';

ALTER FUNCTION dba.system_message_add (system_message_kind, citext, citext, jsonb)
    OWNER TO user_bender;

Solution

  • It Works....

    I've finally found a few hours to swing back to this and, yes, you can get a custom trigger on cron.job_run_details to do something with failed results. In my case, inserting a record in another table. There are some wrinkles (pg_cron) and quirks (RDS). Short version, for anyone else trying to do this:

    • You need an AFTER UPDATE trigger, not an AFTER INSERT trigger.

    • RDS runs pg_cron jobs as rds_superuser. Not a role you can mess with, but you can inherit from it, and you can GRANT schema USAGE and routine EXECUTE. And you need to.

    pg_cron INSERT and UPDATE

    When it starts a job, pg_cron seems to create a job_run_details record, which you can see in an AFTER INSERT trigger function:

    {
        "jobid": 83,
        "runid": 140394,
        "status": "starting",
        "command": "SELECT foo FROM bar;",
        "job_pid": null,
        "database": "nautilus",
        "end_time": null,
        "username": "rds_super",
        "start_time": null,
        "return_message": null
    }
    

    The status = 'starting'. Fair. I'd assumed, for absolutely no reason, that a completed record was inserted once the job was finished. If you want the final record, use AFTER UPDATE and check for status = 'failed', if you're needs are like mine.

    Permissions

    pg_cron runs as background worker process, and the logs don't indicate a user name. I wrote out current_user and it's rds_superuser. I only got things working acceptably, once I'd granted rds_superuser explicit USAGE and EXECUTE rights. Before that, I was getting errors on citext_eq and other random extension functions. I've ignored rds_superuser as a mater of course...now I know. I'll look at changed he default privileges for the schemas in the postgres database.

    Troubleshooting and logs on RDS

    I'm glad we're deploying on something like RDS as I don't want to spend even more of my day on DBA work. But....sometimes you really want a file system. Ever more so for log files. I used Splunk intensely for some time, and am now spoiled for life. Don't have it now :( Anyway, it's hard to find a lot of the Postgres logs on RDS, they're not all exposed. However, you can get the error logs, relatively live. There's a log_fdw for this purpose. Doesn't work. Well, works a little, but not reliably. Smells like an abandoned feature.

    Instead, you can see and download live logs in the AWS console for your instance. There's a little lag here, but it's not bad. A bit worse when the log is rolled but, even then, a few seconds. Still, it's kind of awkward. However, a life-saver, when you need it.

    I like to force descriptive errors into the log to mark my place, like:

    select changed_the_permissions_for_rds_superuser
    

    Not a meaningful SQL statement, but it pushes it right into the log. Then I can look for events of interest afer that entry.

    Final trigger function and bindings

    Here's where the code ended up:

    --------------------------------------------
    -- Create AFTER UPDATE function
    --------------------------------------------
    CREATE OR REPLACE FUNCTION dba.trigger_function_job_run_details_after_update()
    RETURNS trigger AS
    
    $BODY$
    
    BEGIN
    
       IF NEW.status = 'failed' THEN
    
             INSERT INTO  dba.system_message (kind, subject, payload_json)
                  VALUES ('Error',
                          'pg_cron job failed',
                           to_json(NEW)
                      );
       END IF;
    
       RETURN NEW;
    
    END
    
    $BODY$
    
      LANGUAGE plpgsql 
      VOLATILE;
    
    COMMENT ON FUNCTION dba.trigger_function_job_run_details_after_update() IS 
    'Post an error to system_message when a pg_cron job fails.';
    
    ALTER FUNCTION dba.trigger_function_job_run_details_after_update
        OWNER TO user_bender;
    
    --------------------------------------------
    -- Create trigger binding
    --------------------------------------------
    CREATE OR REPLACE TRIGGER trigger_job_run_details_after_update
             AFTER UPDATE
                ON cron.job_run_details
               FOR EACH ROW
           EXECUTE PROCEDURE dba.trigger_function_job_run_details_after_update();