Search code examples
scalapysparkapache-spark-sqlaws-glue

How do I run SQL SELECT on AWS Glue created Dataframe in Spark?


I have the following job in AWS Glue which basically reads data from one table and extracts it as a csv file in S3, however I want to run a query on this table (A Select, SUM and GROUPBY) and want to get that output to CSV, how do I do this in AWS Glue? I am a newbie in Spark so please help

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

Solution

  • The "create_dynamic_frame.from_catalog" function of glue context creates a dynamic frame and not dataframe. And dynamic frame does not support execution of sql queries.

    To execute sql queries you will first need to convert the dynamic frame to dataframe, register a temp table in spark's memory and then execute the sql query on this temp table.

    Sample code:

    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from pyspark.sql import SQLContext
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    spark_session = glueContext.spark_session
    sqlContext = SQLContext(spark_session.sparkContext, spark_session)
    
    DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
    df = DyF.toDF()
    df.registerTempTable('{{name}}')
    df = sqlContext.sql('{{your select query with table name that you used for temp table above}}')
    df.write.format('{{orc/parquet/whatever}}').partitionBy("{{columns}}").save('path to s3 location')