Search code examples
pythonamazon-web-servicesapache-sparkpysparkaws-glue

After AWS Glue is finished, how to execute a SQL script or stored procedure?


I'm learning AWS Glue. With traditional ETL a common pattern is to look up the primary key from the destination table to decide if you need to do an update or an insert (aka upsert design pattern). With glue there doesn't seem to be that same control. Plain writing out the dynamic frame is just a insert process. There are two design patterns I can think of how to solve this:

  1. Load the destination as data frame and in spark, left outer join to only insert new rows (how would you update rows if you needed to? delete then insert??? Since I'm new to spark this is most foreign to me)
  2. Load the data into a stage table and then use SQL to perform the final merge

It's this second method that I'm exploring first. How can I in the AWS world execute a SQL script or stored procedure once the AWS Glue job is complete? Do you do a python-shell job, lambda, directly part of glue, some other way?


Solution

  • I have used pymysql library as a zip file uploaded to AWS S3, and configured in the AWS Glue job parameters. And for UPSERTs, I have used INSERT INTO TABLE....ON DUPLICATE KEY.

    So based on the primary key validations, the code would either update a record if already exists, or insert a new record. Hope this helps. Please refer this:

    import pymysql
    
    rds_host  = "rds.url.aaa.us-west-2.rds.amazonaws.com"
    name = "username"
    password = "userpwd"
    db_name = "dbname"
    conn = pymysql.connect(host=rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
    
    with conn.cursor() as cur:
       insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, "
                 "territory_name,state) "
                 "VALUES(zip_code, territory_code, territory_name, state) "
                 "ON DUPLICATE KEY UPDATE territory_name = "
                 "VALUES(territory_name), state = VALUES(state);"
       cur.execute(insertQry)
       conn.commit()
       cur.close()
    

    In the above code sample, territory-code, zip-code are primary keys. Please refer here as well: More on looping inserts using a for loops