Search code examples
apache-sparkpysparkaws-glueaws-glue-spark

How to run parallel threads in AWS Glue PySpark?


I have a spark job that will just pull data from multiple tables with the same transforms. Basically a for loop that iterates across a list of tables, queries the catalog table, adds a timestamp, then shoves into Redshift (example below).

This job take around 30 minutes to complete. Is there a way to run these in parallel under the same spark/glue context? I don't want to create separate glue jobs if I can avoid it.

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *


# query the runtime arguments
args = getResolvedOptions(
    sys.argv,
    ["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
    catalog_table = glueContext.create_dynamic_frame.from_catalog(
        database="test", table_name=table, transformation_ctx=table
    )
    data_set = catalog_table.toDF().withColumn(
        "batchLoadTimestamp", lit(job_execution_timestamp)
    )

    # covert back to glue dynamic frame
    export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

    # remove null rows from dynamic frame
    non_null_records = DropNullFields.apply(
        frame=export_frame, transformation_ctx="non_null_records"
    )

    temp_dir = os.path.join(args["TempDir"], redshift_table_name)

    stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=non_null_records,
        catalog_connection=args["redshift_catalog_connection"],
        connection_options={
            "dbtable": f"{args['target_schema']}.{redshift_table_name}",
            "database": args["target_database"],
            "preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
        },
        redshift_tmp_dir=temp_dir,
        transformation_ctx="stores_redshiftSink",
    ) ```

Solution

  • You can do the following things to make this process faster

    1. Enable concurrent execution of job.
    2. Allot sufficient number of DPU.
    3. Pass the list of tables as a parameter
    4. Execute the job in parallel using Glue workflows or step functions.

    Now suppose you have 100 table's to ingest, you can divide the list in 10 table's each and run the job concurrently 10 times.

    Since your data will be loaded parallely so time of Glue job run will be decreased hence less cost will be incurred.

    Alternate approach that will be way faster is to use redshift utility direct.

    1. Create table in redshift and keep the batchLoadTimestamp column as default to current_timestamp.
    2. Now create the copy command and load data into the table directly from s3.
    3. Run the copy command using Glue python shell job leveraging pg8000.

    Why this approach will be faster?? Because the spark redshift jdbc connector first unloads the spark dataframe to s3 then prepares a copy command to the redshift table. And while running copy command directly you are removing the overhead of running unload command and also reading data into spark df.