Search code examples
postgresqlamazon-web-servicespysparkaws-glue

Update a Table Column in PostgreSQL within an AWS Glue Job Using PySpark SQL Queries


I am working with a large amount of data from Square, storing it in PostgreSQL using AWS Lambda functions. Afterward, I run AWS Glue ETL jobs to transform this data into the required format.

Currently, I have two Glue jobs:

One job transforms the order data from the "square_order" table and stores the aggregated data in a separate table called "square_dashboard." Another job transforms customer data from the "square_customer" table. The issue I am facing is that I need to update the "square_dashboard" table based on the aggregated data from the "square_customer" table.

I have a SQL query that works fine in PostgreSQL to perform this update:

-- First, update existing rows in the square_dashboard table
UPDATE square_dashboard sd
SET new_customer = subquery.new_customers
FROM (
    SELECT user_id, DATE(created_at) AS customer_date,
           COUNT(*) AS new_customers
    FROM square_customer
    GROUP BY user_id, DATE(created_at)
) AS subquery
WHERE sd.user_id = subquery.user_id
  AND sd.date = subquery.customer_date;

-- Insert new rows for cases where the date doesn't exist
INSERT INTO square_dashboard (user_id, date, new_customer, day)
SELECT subquery.user_id, 
       subquery.customer_date, 
       subquery.new_customers,
       TO_CHAR(subquery.customer_date, 'Dy')
FROM (
    SELECT user_id, DATE(created_at) AS customer_date,
           COUNT(*) AS new_customers
    FROM square_customer
    GROUP BY user_id, DATE(created_at)
) AS subquery
WHERE NOT EXISTS (
    SELECT 1 
    FROM square_dashboard sd 
    WHERE sd.user_id = subquery.user_id
      AND sd.date = subquery.customer_date
);

Now, I need to perform this update within my AWS Glue job. Is there a correct way to execute these SQL queries directly in a Glue job, or should I take a different approach?

I attempted to run the SQL queries directly in the AWS Glue job using PySpark's spark.sql() method, expecting it to update the square_dashboard table in PostgreSQL based on the aggregated data from the square_customer table. My expectation was that the SQL queries would execute successfully within the Glue job, just as they do in PostgreSQL directly. However, I encountered difficulties in integrating these queries into the Glue job's workflow, especially in terms of establishing a connection to the PostgreSQL database and executing the SQL statements correctly.


Solution

  • Create a dummy dataset and use glueContext.write_dynamic_frame to inject your custom SQL script. The following script is used on Redshift, please update the connection_option to suit your database type.

    def execute_custom_SQL_script(sql_script):
    
    
        data = [("dummy dataset!")]
        columns = ["message"]
    
        df = spark.createDataFrame(data, columns)
    
        # Convert the Spark DataFrame to a DynamicFrame
        dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
        glueContext.write_dynamic_frame.from_jdbc_conf(
            frame=dynamic_frame,
            catalog_connection="your_catalog_connection_",
            connection_options={
                "database": "your_database",
                "dbtable": "dummy_table",
                "preactions": "",
                "postactions": sql_script + '; drop table if exists dummy_table;'
            },
            redshift_tmp_dir="s3://123123123123",
            transformation_ctx="dynamic_frame_iteration")