Search code examples
pysparkrandomhadoop-yarnparquetamazon-emr

Creating a 50Giga parquet file of random integers using pyspark fails


I've tried using different sizes of clusters (EMR on AWS) and it always fails due to YARN killing all the nodes: https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/

I assume that its due to too high memory requirements, but I had a cluster of 10 m5.4xlarge instances (64Giga RAM) and it still failed.

Pyspark code:


    num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
    max_int = 2147483647


    # Create a SparkSession
    spark = SparkSession\
        .builder\
        .appName("GenerateRandomData") \
        .getOrCreate()

    # Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
    df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))

    # Save the DataFrame to a Parquet file
    out_file = os.path.join(out_folder, 'random_list.parquet')
    partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
    df.repartition(partitions).write.mode("overwrite").parquet(out_file)

    # Stop the SparkSession
    spark.stop()

I am open to any other way to create a parquet file of 50Giga that has random integers in it.

Also the data generation stage is only being done by 2 taks, but I have around 140 cores in my cluster: From spark UI

Thanks!


Solution

  • At the heart of delta tables are parquet files. As we known in spark, unless you repartition (1), you have multiple partitions.

    Lets use an iterative approach to add data to our delta table until the number of rows or size matches your objective.

    Code below create a new database.

    %sql
    CREATE DATABASE stack
    

    Code below creates a new table.

    %sql 
    CREATE TABLE someints (id INT, value INT, stamp STRING);
    

    Create a function to add data to the delta table.

    # required libraries
    from pyspark.sql.functions import *
    from datetime import datetime
    
    def add_random_data():
      
      # get variables
      num_of_ints = int(1024 * 1024)
      max_int = 2147483647
      stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
      table_name = "stack.someints"
    
      # create dataframe
      df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer")).withColumn("stamp", lit(stamp))
    
      # write data frame
      df.write.mode("append").format("delta").saveAsTable(table_name)
    

    I am adding a million rows per call. You can use a for in range loop to call the function named add_random_data() repeatedly.

    %sql
    select stamp, count(*) as cnt from stack.someints group by stamp
    

    I am using stamp to capture the date/time I call the function.

    enter image description here

    Last but not least, we can use the delta table properties to get the number of total bytes.

    # get size in bytes
    spark.sql("describe detail stack.someints").select("sizeInBytes").collect()
    

    enter image description here

    Once you have the correct size you can always write out to any format that you want.

    Just remember that parquet is a column store format and it might take a while to make a 50 GB file. However, this iterative approach should avoid the out of memory issue.