Search code examples
snowflake-cloud-data-platformdmlrowcount

Auditing or Rowcount for each DML Operation in Snowflake Stored Procedure


I want to capture the auditing inside the stored procedure for the merge and insert statements and i have tried using Result_Scan by getting Query ID from the QUERY_HISTORY by session. But these statements are not allowed in the JavaScript procedure. Then i have created a function for getting Query ID for the latest executed SQL queries.

create or replace function GET_QUERY_ID()
   RETURNS VARCHAR
   AS 'SELECT QUID FROM (SELECT (QUERY_ID)::VARCHAR AS QUID FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(CURRENT_SESSION()::NUMBER)) WHERE QUERY_TYPE IN (''INSERT'',''MERGE'') ORDER BY END_TIME DESC ) LIMIT 1';

And tried for creating another function for row count

create or replace function GET_RESULT_SCAN(P_QUERY_ID VARCHAR)
   RETURNS TABLE ( INSERT_ROWS NUMBER ,UPDATED_ROWS NUMBER)
   AS 'select * from (select "number of rows updated"::NUMBER as INSERT_ROWS, "number of multi-joined rows updated"::NUMBER as UPDATED_ROWS from table(result_scan(P_QUERY_ID)))';

But this is not working and these functions i can't call in the stored procedure created with JavaScript. Kindly let me know the best practice to capture the auditing on the number of rows inserted and updated in stored procedure. Assume in a stored procedure there are 5-10 SQL statements.


Solution

  • Here's a simple procedure that does a merge and captures & returns the number of rows insert and rows update:

    CREATE OR REPLACE PROCEDURE utl.arch_merge_sp(P_STAGE_TBL VARCHAR, P_FINAL_TBL VARCHAR)
      RETURNS STRING
      LANGUAGE JAVASCRIPT
      EXECUTE AS CALLER
    AS $$
      var sqlCmd = "";
      var sqlStmt = "";
      var result = "";
    
      try {
        sqlCmd = `
          MERGE INTO final_t F USING stage_t S
          ON F.KEY_ID = S.KEY_ID
          WHEN MATCHED THEN UPDATE SET
           F.ATTR_NM = S.ATTR_NM
          ,F.ATTR_NBR = S.ATTR_NBR
          WHEN NOT MATCHED THEN INSERT (
           F.KEY_ID
          ,F.ATTR_NM
          ,F.ATTR_NBR
          ) VALUES (
           S.KEY_ID
          ,S.ATTR_NM
          ,S.ATTR_NBR);
          `;
        sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
        rs = sqlStmt.execute();
    
        sqlCmd = 
          `SELECT "number of rows inserted", "number of rows updated"
            FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`;
        sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
        rs = sqlStmt.execute();
        rs.next();
    
        result += "Rows inserted: " + rs.getColumnValue(1) + ", Rows updated: " + rs.getColumnValue(2)
    
      }
      catch (err) {
        result =  "Failed: Code: " + err.code + " | State: " + err.state;
        result += "\n  Message: " + err.message;
        result += "\nStack Trace:\n" + err.stackTraceTxt; 
        }
    
      return result;
    $$;