Search code examples
apache-sparkpyspark

Will this code execute efficiently in PySpark for a large dataset?


I am very new to Spark, and hence this question. I have a small dataset in a folder (with subfolders of categories eg. car model names for example) and each row in this dataset has a partitionID column. What I am trying to achieve is write PySpark code to repartition the data so that the output folders are partitioned by this partitionID (unique list of these) and then the rows corresponding to this partition are all written into that folder.

I have the following code which works on small dataset without any issues. My question is will this same code run efficiently on a very large dataset efficiently? (lets say 25TB of data?). The reason I ask this question is I am not sure if the driver or executor materializes these dataframes during iteration on a single server instead of distributed storage.

from pyspark.sql import SparkSession
import os
import sys
from pyspark.sql import functions as F

...

builder = SparkSession.builder
spark = builder.getOrCreate()

root_folder_path   = "<ROOT_PATH>"
output_folder_path = "<OUTPUT_PATH>"

# Get a list of categories inside the directory
folder_names = [folder for folder in os.listdir(root_folder_path) if os.path.isdir(os.path.join(root_folder_path, folder))]

# Iterate over the list of folder names (or categories)
for category_name in folder_names:
    # Read the entity parquet files
    entity_df = spark.read.parquet(root_folder_path + category_name, header=True, inferSchema=True)
    
    # Iterate over distinct partition ID values and write rows to separate folders
    distinct_data_partition_ids = entity_df.select("partitionId").distinct().rdd.map(lambda x: x[0]).collect()

    for data_partition_id in distinct_data_partition_ids:
        # Filter rows for the current dataPartitionId
        filtered_df = entity_df.filter(entity_df["partitionId"] == data_partition_id)

        # Construct the output directory for this partition ID
        output_directory = os.path.join(output_folder_path + category_name + "/" + str(data_partition_id))
    
        # Write rows to the corresponding output directory
        filtered_df.write.parquet(output_directory, mode="overwrite")

Solution

  • I guess we can go in a simpler way by using partitionBy()

    for category_name in folder_names:
        # Read the entity parquet files
        entity_df = spark.read.parquet(root_folder_path + category_name, header=True, inferSchema=True)
    
        # Write rows to the corresponding output directory
        entity_df.write.\
           partitionBy("partitionId").\
           mode("overwrite").\
           parquet(output_directory)
    

    This will create partitions in the output folder based on the partitionId.