Search code examples
apache-sparkamazon-s3pysparkparquethadoop-partitioning

optimizing reading from partitioned parquet files in s3 bucket


I have a large dataset in parquet format (~1TB in size) that is partitioned into 2 hierarchies: CLASS and DATE There are only 7 classes. But the Date is ever increasing from 2020-01-01 onwards. My data is partitioned by CLASS first and then DATE

So something like:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

I load my data by CLASS in a for-loop. If I load the entire parquet file, YARN kills the job since it overloads the memory instances. But I load all the days since I am doing a percentile calculation in my modeling. This method takes about 23hrs to complete.

However, if I repartition such that I only have the CLASS partition, the job takes about 10hrs. Does having too many sub-partitions slow down the spark executor jobs? I keep the partition hierarchy as CLASS -> DATE only because I need to append new data by DATE every day. If having only 1 partition is more efficient, then I would have to repartition to just the CLASS partition every day after loading new data. Could someone explain why having a single partition works faster? And if so, what would be the best method to partition the data on a daily basis by appending and without repartitioning the entire dataset?

Thank You

EDIT: I use the for loop on the file structure to loop by CLASS partition like so:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

The loaded df will have all the dates for CLASS=1. I then output the file as separate parquet files for each CLASS such that I have 7 parquet files:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

I then merge the 7 parquets into a single parquet is not a problem as the resulting parquet files are much smaller.


Solution

  • I have the partitioned data with three columns, year, month, and id. The folder path hierarchy is

    year=2020/month=08/id=1/*.parquet
    year=2020/month=08/id=2/*.parquet
    year=2020/month=08/id=3/*.parquet
    ...
    year=2020/month=09/id=1/*.parquet
    year=2020/month=09/id=2/*.parquet
    year=2020/month=09/id=3/*.parquet
    

    and I can read the DataFrame by loading the root path.

    val df = spark.read.parquet("s3://mybucket/")
    

    Then, the partitioned column is automatically added to the DataFrame. Now, then you can filter your data for the partitioned column in a way that

    val df_filtered = df.filter("year = '2020' and month = '09'")
    

    and do something with df_filtered then the spark will use only the partitioned data!


    For your repeated processing, you can use the fair scheduler of the spark. Add the fair.xml file into src/main/resources of your project with the below code,

    <?xml version="1.0"?>
    
    <allocations>
        <pool name="fair">
            <schedulingMode>FAIR</schedulingMode>
            <weight>10</weight>
            <minShare>0</minShare>
        </pool>
    </allocations>
    

    and set the spark configuration after creating the spark session.

    spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
    spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")
    

    Then you can do your job in parallel. You may want to parallelize the job depends on the CLASS, so

    val classes = (1 to 7).par
    val date = '2020-09-25'
    
    classes foreach { case i =>
    
        val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
        
        // Do your job
    
    }
    

    the code will work at the same time with different CLASS values.