Search code examples
apache-sparkpysparkparallel-processingaws-gluedistributed-system

How do I parallelize writing a list of Pyspark dataframes across all worker nodes?


I have a basic AWS Glue 4.0 Job I'm trying to run that runs a transform function and returns a list of dataframes:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from transform import transform
from pyspark.sql.functions import lit
from datetime import datetime

# ========================================== CONTEXT INITIALIZATION ====================================================
args = getResolvedOptions(sys.argv)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ======================================================================================================================



pyspark_df_list = transform(inputs)


// NOT SURE what to do here to achieve parallelization

# ======================================================================================================================

job.commit()

Things I've tried:

  1. Iterating through the list seems to break the paralleization and essentially makes the write operations single-threaded.
  2. Creating a write_df function and trying to call the parallelize/forEach function off the SpaceContext gives me the following error:

RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


Solution

  • Actually you are parallelizing the individual dataframe.

    1. In Scala you can use .par. That said, it depends on your resource allocation mode used with Resource Manager, e.g. YARN.
    2. You can write N Spark Apps as alternative.
    3. Or you can have multiple independent (N) Actions in a single Spark App, with N arrays. For both SCALA and pyspark.
    4. For pyspark you can use ThreadPoolExecutor. See https://gist.github.com/pavel-filatov/87a68dd621546b9cac1e0d2ea269705f. An excellent explanation on .par equivalent in pyspark.

    I am not sure I would go down latter or .par approach.