Search code examples
pythonapache-sparkpysparkamazon-emr

Ensuring File Size Limit is Adhered to When Batch Processing Downloads in PySpark on EMR


I'm working on a PySpark application running on Amazon EMR, where my task involves downloading files based on a url from a DataFrame. The goal is to continuously download these files on an EMR executor until a specified file size limit is reached. Once this limit is hit, I intend to bundle these files into a tar archive. However, I'm facing a challenge with enforcing the file size limit correctly before initiating the tar file creation process. I've tried to employ the mapPartitions function with the intention of batching multiple files until they collectively reach a file size limit, after which they should be archived into a single tar file.

The process should work as follows:

  1. Download files.
  2. Keep track of the cumulative size of these downloaded files.
  3. Once the cumulative size reaches a pre-defined limit, create a tar file with these files.
  4. Reset the tracking and start the process again for the next batch of files until all are processed.

Here's a simplified version of my approach:

from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id

# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()

# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)

# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024  # 100MB

def download_and_tar_files(partition):
    accumulated_size = 0
    files_to_tar = []

    for row in partition:
        file = row.filename
        file_path, file_size = download_file(file)
        files_to_tar.append(file_path)
        accumulated_size += file_size

        if accumulated_size >= FILE_SIZE_LIMIT:
            tar_file_name = f"{uuid.uuid4()}.tar"
            create_tar_file(files_to_tar, tar_file_name)
            files_to_tar = []
            accumulated_size = 0

    # Handle any remaining files
    if files_to_tar:
        tar_file_name = f"{uuid.uuid4()}.tar"
        create_tar_file(files_to_tar, tar_file_name)

# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)

Could anyone provide insight into potential issues with my size calculation or batching logic? I've also tried to employ the map function with the intention of processing each row of file name. Despite this approach, my current implementation results in creating a tar file for each individual file, failing to accumulate multiple files before reaching the size threshold. This leads to an output where each tar archive contains only one file, rather than the intended batch processing until the specified size limit is met.


Solution

  • The issue I encountered with the FILE_SIZE_LIMIT not being adhered to on AWS EMR's core nodes stems from the way global variables are managed and accessed within Spark's distributed architecture. Specifically, FILE_SIZE_LIMIT, being a global variable, was initialized on the EMR cluster's primary (master) node. However, this variable was not directly accessible on the core (executor) nodes due to each node operating in its own memory and process space.

    After a bit of deep dive I figured out that in Spark applications, the driver (running on the primary node) and the executors (running on core nodes) do not share the same memory space. This means that global variables declared in the driver's script are not automatically propagated to the executors. As a result, any logic relying on FILE_SIZE_LIMIT within executor nodes would not function as expected because the executors did not have the correct value of this variable.

    To resolve this, I needed to explicitly pass the FILE_SIZE_LIMIT value to the executor nodes. This was achieved by including it as an argument to the functions being executed on the executors.