Search code examples
apache-sparkpysparkapache-spark-sqlparquetpartition

How can I automate the process of running the same aggregation in 12 parquet files and then join the results in 1 table using PySpark?


I have to make 6 different calculations (sums and averages by day) in a parquet file that contains 1 year of data (day level). The problem is the file is too big and Jupyter crashes in the process. So I divided the file into 12 months (12 parquet files). I tested if the server would be able to make the calculations in 1 month of data in a reasonable time and it did. I want to avoid writing 72 different queries (6 calculations * 12 months). The result of each calculation would have to be saved in a parquet file and then joined in a final table. How would you recommend solving this by automating the process in PySpark? I would appreciate any suggestions. Thanks.

This is an example of the code I have to run in each of the 12 parts of the data:

month1= spark.read.parquet("s3://af/my_folder/month1.parquet")
month1.createOrReplaceTempView("month1")
month1sum= spark.sql("select id, date, sum(sessions) as sum_num_sessions from month1 where group by 1,2 order_by 1 asc")
month1sum.write.mode("overwrite").parquet("s3://af/my_folder/month1sum.parquet")
month1sum.createOrReplaceTempView("month1sum")
month_1_calculation=month1sum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
month_1_calculation.write.mode("overwrite").parquet("s3://af/my_folder/month_1_calculation.parquet")```





Solution

  • Quick approach: how about a for loop?

    for i in range(1, 13):
        month= spark.read.parquet(f"s3://af/my_folder/month{i}.parquet")
        month.createOrReplaceTempView(f"month{i}")
    
        monthsum= spark.sql(f"select id, date, sum(sessions) as sum_num_sessions from month{i} where group by 1,2 order_by 1 asc")
        monthsum.write.mode("overwrite").parquet(f"s3://af/my_folder/month{i}sum.parquet")
        monthsum.createOrReplaceTempView(f"month{i}sum")
    
        month_calculation = monthsum.groupBy('date').agg(avg('sum_num_sessions').alias('avg_sessions'))
        month_calculation.write.mode("overwrite").parquet(f"s3://af/my_folder/month_{i}_calculation.parquet")
    

    Long-term approach: Spark is designed to handle big data, so no matter how big your data is, as long as you have sufficient hardware (number of cores and memory), Spark should be able to take care of it with correct configurations. So adjusting your number of core, executor memory, driver memory, improving parallelism (by changing number of partitions), ... would definitely solve your issue.