In my script, I converted all dynamicframe
to dataframe
in pyspark, and do the groupby
and join
operation. Then in matrics
view, I found that only one executor is active no matter how many DPU I set.
The job failed after about 2 hours with
Diagnostics: Container [pid=8417,containerID=container_1532458272694_0001_01_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.7 GB of 27.5 GB virtual memory used. Killing container.
I have about 2 billion rows of data. My DPU
setting is set to 80.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")
applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))
df1.join(df2, "fieldB")
result = DynamicFrame.fromDF(result_joined, glueContext, "result")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://test-bucket"}, format = "json", transformation_ctx = "datasink2")
job.commit()
Did I miss anything?
It turns out that it's because my input data is too large, so it got stuck at the beginning where only one executor is active. Once the calculation begins, I see multiple executors active.
The df1.repartition(df1("fieldB"))
actually makes it slower, maybe I'm not using it correctly.