Search code examples
pysparkaws-gluepy4j

How to run arbitrary / DDL SQL statements or stored procedures using AWS Glue


Is it possible to execute arbitrary SQL commands like ALTER TABLE from AWS Glue python job? I know I can use it to read data from tables but is there a way to execute other database specific commands?

I need to ingest data into a target database and then run some ALTER commands right after.


Solution

  • So after doing extensive research and also opening a case with AWS support, they told me it is not possible from Python shell or Glue pyspark job at this moment. But I just tried something creative and it worked! The idea is to use py4j that sparks relies on already and utilize standard java sql package.

    Two huge benefits of this approach:

    1. A huge benefit of this that you can define your database connection as Glue data connection and keep jdbc details and credentials in there without hardcoding them in the Glue code. My example below does that by calling glueContext.extract_jdbc_conf('your_glue_data_connection_name') to get jdbc url and credentials, defined in Glue.

    2. If you need to run SQL commands on a supported out of the box Glue database, you don't even need to use/pass jdbc driver for that database - just make sure you set up Glue connection for that database and add that connection to your Glue job - Glue will upload proper database driver jars.

    Remember this code below is executed by a driver process and cannot be executed by Spark workers/executors.

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    logger = glueContext.get_logger()
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # dw-poc-dev spark test
    source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
    
    from py4j.java_gateway import java_import
    java_import(sc._gateway.jvm,"java.sql.Connection")
    java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
    java_import(sc._gateway.jvm,"java.sql.DriverManager")
    java_import(sc._gateway.jvm,"java.sql.SQLException")
    
    conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
    
    print(conn.getMetaData().getDatabaseProductName())
    
    # call stored procedure, in this case I call sp_start_job
    cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
    cstmt.setString("job_name", "testjob");
    results = cstmt.execute();
    
    conn.close()