I've inherited some code that runs incredibly slowly on AWS Glue.
Within the job it creates a number of dynamic frames that are then joined using spark.sql
. Tables are read from a MySQL and Postgres db and then Glue is used to join them together to finally write another table back to Postgres.
Example (note dbs etc have been renamed and simplified as I can't paste my actual code directly)
jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")
# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")
# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")
# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url = conf_g["url"] + "/reporting"
new_transactions_df.write.option("truncate", "true").jdbc(url, "staging.transactions", properties=conf_g, mode="overwrite")
The [SQL CODE HERE]
is literally a simple select statement joining the three tables together to produce an output that is then written to the staging.transactions table.
When I last ran this it only wrote 150 rows but took 9 minutes to do so. Can somebody please point me in the direction of how to optimise this?
Additional info:
Generally, when reading/writing data in spark using JDBC drivers, the common issue is that the operations aren't parallelized. Here are some optimizations you might want to try:
From the code you provided it seems that all the tables data is read using one query and one spark executor.
If you use spark dataframe reader directly, you can set options partitionColumn
, lowerBound
, upperBound
, fetchSize
to read multiple partitions in parallel using multiple workers, as described in the docs. Example:
spark.read.format("jdbc") \
#...
.option("partitionColumn", "partition_key") \
.option("lowerBound", "<lb>") \
.option("upperBound", "<ub>") \
.option("numPartitions", "<np>") \
.option("fetchsize", "<fs>")
When using read partitioning, note that spark will issue multiple queries in parallel, so make sure the db engine will support it and also optimize indexes especially for the partition_column
to avoid entire table scan.
In AWS Glue, this can be done by passing additional options using the parameter additional_options
:
To use a JDBC connection that performs parallel reads, you can set the
hashfield
,hashexpression
, orhashpartitions
options:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
additional_options = {"hashfield": "transID", "hashpartitions": "10"}
).toDF().createOrReplaceTempView("trans")
This is described in the Glue docs: Reading from JDBC Tables in Parallel
batchsize
option when writing:In you particular case, not sure if this can help as you write only 150 rows, but you can specify this option to improve writing performance:
new_transactions_df.write.format('jdbc') \
# ...
.option("batchsize", "10000") \
.save()
You can also optimize reading by pushing down some query (filter, column selection) directly to the db engine instead of loading the entire table into dynamic frame then filter.
In Glue, this can be done using push_down_predicate
parameter:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
push_down_predicate = "(transDate > '2021-01-01' and transStatus='OK')"
).toDF().createOrReplaceTempView("trans")
See Glue programming ETL partitions pushdowns
In some cases, you could consider exporting tables into files using the db engine and then reading from files. The same implies when writing, first write to file then use db bulk insert command. This could avoid the bottleneck of using Spark with JDBC.