Search code examples
apache-sparkpysparkpartitioningparquet

PySpark - optimize number of partitions after parquet read


In a parquet data lake partitioned by year and month, with spark.default.parallelism set to i.e. 4, lets say I want to create a DataFrame comprised of months 11~12 from 2017, and months 1~3 from 2018 of two sources A and B.

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

If I get the number of partitions, Spark used spark.default.parallelism as default:

df.rdd.getNumPartitions()
Out[4]: 4

Taking into account that after creating df I need to perform join and groupBy operations over each period, and that data is more or less evenly distributed over each one (around 10 million rows per period):

Question

  • Will a repartition improve the performance of my subsequent operations?
  • If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?

Solution

  • Will a repartition improve the performance of my subsequent operations?

    Typically it won't. The only reason to preemptively repartition data is to avoid further shuffling when the same Dataset is used for multiple joins, based on the same condition

    If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?

    Let's go step-by-step:

    • should I repartition by the number of periods

      Practitioners don't guarantee 1:1 relationship between levels and partitions, so the only thing to remember is, that you cannot have more non-empty partitions than unique keys, so using significantly larger value doesn't make sense.

    • and explicitly reference the columns to repartition

      If you repartition and subsequently join or groupBy using the same set of columns for both parts is the only sensible solution.

    Summary

    repartitoning before join makes sense in two scenarios:

    • In case of multiple subsequent joins

      df_ = df.repartition(10, "foo", "bar")
      df_.join(df1, ["foo", "bar"])
      ...
      df_.join(df2, ["foo", "bar"])
      
    • With single join when desired number of the output partitions is different than spark.sql.shuffle.partitions (and there is no broadcast join)

      spark.conf.get("spark.sql.shuffle.partitions")
      # 200
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      
      df1_ = df1.repartition(11, "foo", "bar")
      df2_ = df2.repartition(11, "foo", "bar")
      
      df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
      # 11
      
      df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
      # 200
      

      which might be preferable over:

      spark.conf.set("spark.sql.shuffle.partitions", 11)
      df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
      spark.conf.set("spark.sql.shuffle.partitions", 200)