Search code examples
pythonmultiprocessingaws-glue

Python mutiprocessing within the same AWS Glue 4.0 job hangs


I am trying to use Python Multiprocessing to process data in parallel within the same AWS Glue 4.0 job. I know that I could use Glue Workflows with multiple jobs to achieve parallel data processing, but for reasons that are irrelevant here, it is something that I don't want to do.

This is my Python code:

from multiprocessing import Pool
import sys
import time
import random

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(f"{args['JOB_NAME']} STARTED")

def worker(table_name, tmp_dir):
    print(f"STARTED WORKER: {table_name}")
    data = load_data(table_name, tmp_dir)
    process_data(table_name, data)
    print(f"FINISHED WORKER: {table_name}")
    
def load_data(table_name, tmp_dir):    
    print(f"LOADING: {table_name}")
    data = glueContext.create_dynamic_frame.from_catalog(database="my_database",
                                                         table_name=table_name,
                                                         redshift_tmp_dir=f"{tmp_dir}/{table_name}",
                                                         transformation_ctx=f"data_source_{table_name}")
    time.sleep(random.randint(1, 5))  # added here to simulate different loading times
    print(f"LOADED: {table_name} has {data.count()} rows")
    return data

def process_data(table_name, data):
    print(f"PROCESSING: {table_name}")
    # do something
    time.sleep(random.randint(1, 5))  # added here to simulate different processing times
    print(f"DONE: {table_name}")

pool = Pool(4)
tables = ['TABLE1', 'TABLE2', 'TABLE3', 'TABLE4', 'TABLE5', 'TABLE6', 'TABLE7', 'TABLE8', 'TABLE9']
for table in tables:
    pool.apply_async(worker, args=(table, args['TempDir']))
pool.close()
pool.join()

print(f"{args['JOB_NAME']} COMPLETED")
job.commit()

Unfortunately, while it seems to start multiple workers correctly, it hangs and never completes until the Glue job finally times out.

This is what I see in the CloudWatch output log. There are no errors in the error log.

2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE3 LOADING: TABLE3 
2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00   STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00   STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00   STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00   STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00   STARTED WORKER: TABLE9 LOADING: TABLE9

I have tried several things, but I cannot understand exactly what the problem is, except that it seems to be hanging on create_dynamic_frame.from_catalog().

Has anybody attempted to do the same and solved it? Why doesn't it work?

Thank you in advance!


Solution

  • After several attempts and adding additional debugging information and exceptions handling, I found out that Python's multiprocessing doesn't work with AWS Glue. The error I got from create_dynamic_frame.from_catalog() is JsonOptions does not exist in the JVM and couldn't go any further.

    However, replacing multiprocessing.Pool() with concurrent.futures.ThreadPoolExecutor() worked and I can now run parallel processes within the same Glue job.