Search code examples
amazon-web-servicespysparkaws-glue

why is my aws glue job uses only one executor and the driver?


In my script, I converted all dynamicframe to dataframe in pyspark, and do the groupby and join operation. Then in matrics view, I found that only one executor is active no matter how many DPU I set.

The job failed after about 2 hours with

Diagnostics: Container [pid=8417,containerID=container_1532458272694_0001_01_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.7 GB of 27.5 GB virtual memory used. Killing container.

I have about 2 billion rows of data. My DPU setting is set to 80.

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")


applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")

df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))

df1.join(df2, "fieldB")

result = DynamicFrame.fromDF(result_joined, glueContext, "result")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://test-bucket"}, format = "json", transformation_ctx = "datasink2")
job.commit()

Did I miss anything?


Solution

  • It turns out that it's because my input data is too large, so it got stuck at the beginning where only one executor is active. Once the calculation begins, I see multiple executors active.

    The df1.repartition(df1("fieldB")) actually makes it slower, maybe I'm not using it correctly.