I have a spark data-frame having a small amount of fields. Some of the fields are huge binary blobs. The size of the entire row is approx 50 MB.
I am saving the data frame into a parquet format. I am controlling the size of the row-group using parquet.block.size
parameter.
Spark will generate a parquet file, however I will always get at least 100 rows in a row group. This is a problem for me since chunk sizes could become gigabytes which does not work well with my application.
parquet.block.size
works as expected as long as the size is big enough to accomodate more than 100 rows.
I modified InternalParquetRecordWriter.java to be MINIMUM_RECORD_COUNT_FOR_CHECK = 2
, which fixed the issue, however, there is no configuration value I can find that would support tuning this hardcoded constant.
Is there a different/better way to get row-group sizes that are smaller than a 100?
This is a snippet of my code:
from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import StructType, StructField, BinaryType
def fake_row(x):
result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
return Row(result, result)
spark_session = SparkSession \
.builder \
.appName("bbox2d_dataset_extraction") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "4g")
spark_session.master('local[5]')
spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
write.option("compression", "none"). \
mode('overwrite'). \
parquet('/tmp/huge/')
Unfortunately I haven't found a way to do so. I reported this issue to remove the hard coded values and make them configurable. I have a patch for it if you're interested.