I've tried using different sizes of clusters (EMR on AWS) and it always fails due to YARN killing all the nodes: https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/
I assume that its due to too high memory requirements, but I had a cluster of 10 m5.4xlarge instances (64Giga RAM) and it still failed.
Pyspark code:
num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
max_int = 2147483647
# Create a SparkSession
spark = SparkSession\
.builder\
.appName("GenerateRandomData") \
.getOrCreate()
# Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))
# Save the DataFrame to a Parquet file
out_file = os.path.join(out_folder, 'random_list.parquet')
partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
df.repartition(partitions).write.mode("overwrite").parquet(out_file)
# Stop the SparkSession
spark.stop()
I am open to any other way to create a parquet file of 50Giga that has random integers in it.
Also the data generation stage is only being done by 2 taks, but I have around 140 cores in my cluster: From spark UI
Thanks!
At the heart of delta tables are parquet files. As we known in spark, unless you repartition (1), you have multiple partitions.
Lets use an iterative approach to add data to our delta table until the number of rows or size matches your objective.
Code below create a new database.
%sql
CREATE DATABASE stack
Code below creates a new table.
%sql
CREATE TABLE someints (id INT, value INT, stamp STRING);
Create a function to add data to the delta table.
# required libraries
from pyspark.sql.functions import *
from datetime import datetime
def add_random_data():
# get variables
num_of_ints = int(1024 * 1024)
max_int = 2147483647
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
table_name = "stack.someints"
# create dataframe
df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer")).withColumn("stamp", lit(stamp))
# write data frame
df.write.mode("append").format("delta").saveAsTable(table_name)
I am adding a million rows per call. You can use a for in range loop to call the function named add_random_data() repeatedly.
%sql
select stamp, count(*) as cnt from stack.someints group by stamp
I am using stamp to capture the date/time I call the function.
Last but not least, we can use the delta table properties to get the number of total bytes.
# get size in bytes
spark.sql("describe detail stack.someints").select("sizeInBytes").collect()
Once you have the correct size you can always write out to any format that you want.
Just remember that parquet is a column store format and it might take a while to make a 50 GB file. However, this iterative approach should avoid the out of memory issue.