I am working with a large amount of data from Square, storing it in PostgreSQL using AWS Lambda functions. Afterward, I run AWS Glue ETL jobs to transform this data into the required format.
Currently, I have two Glue jobs:
One job transforms the order data from the "square_order" table and stores the aggregated data in a separate table called "square_dashboard." Another job transforms customer data from the "square_customer" table. The issue I am facing is that I need to update the "square_dashboard" table based on the aggregated data from the "square_customer" table.
I have a SQL query that works fine in PostgreSQL to perform this update:
-- First, update existing rows in the square_dashboard table
UPDATE square_dashboard sd
SET new_customer = subquery.new_customers
FROM (
SELECT user_id, DATE(created_at) AS customer_date,
COUNT(*) AS new_customers
FROM square_customer
GROUP BY user_id, DATE(created_at)
) AS subquery
WHERE sd.user_id = subquery.user_id
AND sd.date = subquery.customer_date;
-- Insert new rows for cases where the date doesn't exist
INSERT INTO square_dashboard (user_id, date, new_customer, day)
SELECT subquery.user_id,
subquery.customer_date,
subquery.new_customers,
TO_CHAR(subquery.customer_date, 'Dy')
FROM (
SELECT user_id, DATE(created_at) AS customer_date,
COUNT(*) AS new_customers
FROM square_customer
GROUP BY user_id, DATE(created_at)
) AS subquery
WHERE NOT EXISTS (
SELECT 1
FROM square_dashboard sd
WHERE sd.user_id = subquery.user_id
AND sd.date = subquery.customer_date
);
Now, I need to perform this update within my AWS Glue job. Is there a correct way to execute these SQL queries directly in a Glue job, or should I take a different approach?
I attempted to run the SQL queries directly in the AWS Glue job using PySpark's spark.sql()
method, expecting it to update the square_dashboard table in PostgreSQL based on the aggregated data from the square_customer table. My expectation was that the SQL queries would execute successfully within the Glue job, just as they do in PostgreSQL directly. However, I encountered difficulties in integrating these queries into the Glue job's workflow, especially in terms of establishing a connection to the PostgreSQL database and executing the SQL statements correctly.
Create a dummy dataset and use glueContext.write_dynamic_frame to inject your custom SQL script. The following script is used on Redshift, please update the connection_option to suit your database type.
def execute_custom_SQL_script(sql_script):
data = [("dummy dataset!")]
columns = ["message"]
df = spark.createDataFrame(data, columns)
# Convert the Spark DataFrame to a DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
glueContext.write_dynamic_frame.from_jdbc_conf(
frame=dynamic_frame,
catalog_connection="your_catalog_connection_",
connection_options={
"database": "your_database",
"dbtable": "dummy_table",
"preactions": "",
"postactions": sql_script + '; drop table if exists dummy_table;'
},
redshift_tmp_dir="s3://123123123123",
transformation_ctx="dynamic_frame_iteration")