I am trying to use Python Multiprocessing to process data in parallel within the same AWS Glue 4.0 job. I know that I could use Glue Workflows with multiple jobs to achieve parallel data processing, but for reasons that are irrelevant here, it is something that I don't want to do.
This is my Python code:
from multiprocessing import Pool
import sys
import time
import random
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(f"{args['JOB_NAME']} STARTED")
def worker(table_name, tmp_dir):
print(f"STARTED WORKER: {table_name}")
data = load_data(table_name, tmp_dir)
process_data(table_name, data)
print(f"FINISHED WORKER: {table_name}")
def load_data(table_name, tmp_dir):
print(f"LOADING: {table_name}")
data = glueContext.create_dynamic_frame.from_catalog(database="my_database",
table_name=table_name,
redshift_tmp_dir=f"{tmp_dir}/{table_name}",
transformation_ctx=f"data_source_{table_name}")
time.sleep(random.randint(1, 5)) # added here to simulate different loading times
print(f"LOADED: {table_name} has {data.count()} rows")
return data
def process_data(table_name, data):
print(f"PROCESSING: {table_name}")
# do something
time.sleep(random.randint(1, 5)) # added here to simulate different processing times
print(f"DONE: {table_name}")
pool = Pool(4)
tables = ['TABLE1', 'TABLE2', 'TABLE3', 'TABLE4', 'TABLE5', 'TABLE6', 'TABLE7', 'TABLE8', 'TABLE9']
for table in tables:
pool.apply_async(worker, args=(table, args['TempDir']))
pool.close()
pool.join()
print(f"{args['JOB_NAME']} COMPLETED")
job.commit()
Unfortunately, while it seems to start multiple workers correctly, it hangs and never completes until the Glue job finally times out.
This is what I see in the CloudWatch output log. There are no errors in the error log.
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE3 LOADING: TABLE3
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00 STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00 STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00 STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00 STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00 STARTED WORKER: TABLE9 LOADING: TABLE9
I have tried several things, but I cannot understand exactly what the problem is, except that it seems to be hanging on create_dynamic_frame.from_catalog()
.
Has anybody attempted to do the same and solved it? Why doesn't it work?
Thank you in advance!
After several attempts and adding additional debugging information and exceptions handling, I found out that Python's multiprocessing
doesn't work with AWS Glue. The error I got from create_dynamic_frame.from_catalog()
is JsonOptions does not exist in the JVM
and couldn't go any further.
However, replacing multiprocessing.Pool()
with concurrent.futures.ThreadPoolExecutor()
worked and I can now run parallel processes within the same Glue job.