I am new to Glue and PySpark, I have an AWS Glue ETL PySpark Job (G.2.X worker, 30 DPUs) which reads data from a S3 based Glue Table (no partitions defined) with 15B rows. It does several filtering over it and finally tries to write just 8 rows of data in S3 file. While all the steps related to filtering and loading the data into a dataframe finished in few minutes, writing the file to S3 takes over 45 mins. Is there a way to reduce the write step time ? I am aware of lazy executions in PySpark, but I don't understand if PySpark always takes so much execution time in final action then how is it useful ?
Below is the function used to extract the data and write files in S3 (data is already processed before this step):
# Initialize GlueContext, Logger and boto3 client
sc = SparkContext()
glueContext = GlueContext(sc)
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
client = boto3.client('s3')
def extract_test_data(tests, df):
def process_test(test, data_frame):
test_name = test["name"]
conditions = test["conditions"]
logger.info(f"Extracting data for IT : {test_name}")
filter_condition = reduce(lambda a, b: a & b, conditions)
# Filter the DataFrame based on the conditions
df_test = data_frame.filter(filter_condition).limit(1) # <--- only one row is selected
df_test_with_test_name = df_test.withColumn("test_name", F.lit(test_name))
selected_columns = ["test_name","titleset_id", "dmid", "entity_id"]
df_selected = df_test_with_test_name.select(*selected_columns)
logger.info(f"Preparing to write the data for test : {test_name}")
dfs_to_write.append(df_selected)
logger.info(f"Completed data extraction for IT : {test_name}")
# Create a list to hold the threads
threads = []
dfs_to_write = []
# Create and start a thread for each test
for test in tests:
thread = threading.Thread(target=process_test, args=(test,df,))
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
logger.info("Completed all the computations")
# Union all the DataFrames from each test into a single DataFrame
final_df = reduce(lambda df1, df2: df1.union(df2), dfs_to_write)
logger.info("Completed union of all data frames, now write the file")
final_df.write.mode("overwrite").parquet(BUCKET_URL + "test_data/" + "test_data_file")
The function executes till logger.info("Completed union of all data frames, now write the file")
in few minutes and then takes forever to run.
Is there any optimisation that can make this process faster ?
I have also tried df.explain()
but I couldn't make much sense out of output :
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
:- Project [AlbumWithIGTrack AS test_name#84, titleset_id#36, dmid#40L, entity_id#28]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#35 DESC NULLS LAST], output=[entity_id#28,titleset_id#36,dmid#40L])
: +- Project [entity_id#28, event_time#35, titleset_id#36, dmid#40L]
: +- Filter (((isnotnull(action#30) AND isnotnull(action_details#31)) AND isnotnull(earliest_ord#37)) AND ((rn#55 = 1) AND ((((action#30 = ManualVerify) AND (action_details#31 = Albums with Instant Gratification tracks ignored from auto verification.)) AND (cast(earliest_ord#37 as timestamp) >= 2023-07-07 06:14:21.267558)) AND (cast(earliest_ord#37 as timestamp) <= 2024-05-22 06:14:21.267564))))
: +- Window [row_number() windowspecdefinition(titleset_id#36, event_time#35 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#36], [event_time#35 DESC NULLS LAST]
: +- Sort [titleset_id#36 ASC NULLS FIRST, event_time#35 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#36, 112), ENSURE_REQUIREMENTS, [id=#311]
: +- Project [entity_id#28, action#30, action_details#31, event_time#35, titleset_id#36, earliest_ord#37, dmid#40L]
: +- Filter ((isnotnull(entity_id#28) AND isnotnull(event_time#35)) AND (((((NOT action#30 IN (Failed,Filtered) AND (NOT action_details#31 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#31))) AND isnotnull(dmid#40L)) AND (isnotnull(titleset_id#36) AND NOT (titleset_id#36 = entity_id#28))) AND (isnotnull(is_audit#39) AND NOT cast(is_audit#39 as boolean))) AND (event_time#35 >= 2023-07-20 06:14:20.85565)))
: +- Scan ExistingRDD[entity_id#28,entity_type#29,action#30,action_details#31,ord_before_auto_verification#32,case_verified_before_auto_verification#33,auto_verified_ord#34,event_time#35,titleset_id#36,earliest_ord#37,earliest_eligibility_start_date#38,is_audit#39,dmid#40L]
:- Project [PastVerifiedEntityWithUnchangedORD AS test_name#85, titleset_id#224, dmid#228L, entity_id#216]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#223 DESC NULLS LAST], output=[entity_id#216,titleset_id#224,dmid#228L])
: +- Project [entity_id#216, event_time#223, titleset_id#224, dmid#228L]
: +- Filter ((((isnotnull(action#218) AND isnotnull(earliest_ord#225)) AND isnotnull(earliest_eligibility_start_date#226)) AND isnotnull(case_verified_before_auto_verification#221)) AND ((rn#55 = 1) AND ((((((action#218 = CatalogUpdate) AND (cast(earliest_ord#225 as timestamp) >= 2023-07-07 06:14:21.208448)) AND (cast(earliest_ord#225 as timestamp) <= 2024-05-22 06:14:21.208456)) AND (cast(earliest_eligibility_start_date#226 as timestamp) >= 2023-07-07 06:14:21.22124)) AND (cast(earliest_eligibility_start_date#226 as timestamp) <= 2024-05-22 06:14:21.221249)) AND NOT case_verified_before_auto_verification#221)))
: +- Window [row_number() windowspecdefinition(titleset_id#224, event_time#223 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#224], [event_time#223 DESC NULLS LAST]
: +- Sort [titleset_id#224 ASC NULLS FIRST, event_time#223 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#224, 112), ENSURE_REQUIREMENTS, [id=#318]
: +- Project [entity_id#216, action#218, case_verified_before_auto_verification#221, event_time#223, titleset_id#224, earliest_ord#225, earliest_eligibility_start_date#226, dmid#228L]
: +- Filter ((isnotnull(entity_id#216) AND isnotnull(event_time#223)) AND (((((NOT action#218 IN (Failed,Filtered) AND (NOT action_details#219 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#219))) AND isnotnull(dmid#228L)) AND (isnotnull(titleset_id#224) AND NOT (titleset_id#224 = entity_id#216))) AND (isnotnull(is_audit#227) AND NOT cast(is_audit#227 as boolean))) AND (event_time#223 >= 2023-07-20 06:14:20.85565)))
...
...
Any help would be greatly appreciated !!
I have tried writing the data in formats other than parquet but still no significant improvement
Tried converting the pySpark dataframe to pandas dataframe.
Spark performs lazy evaluation. According to the official document:
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
In your code, Spark doesn't compute any data until reaching the line final_df.write.mode(...).parquet(...)
. Threads are also unnecessary. You can just use for-loop here since these operations are just transformations, not actions.
Check the Spark Web UI or history server to see which part is the actual bottleneck.