Increase parallelism of reading a parquet file - Spark optimize self join

I want to perform a self join in order to generate candidate matching pairs. Currently, this is not working as this operation is way too slow. Unfortunately, I cannot broadcast the data frames as they are too large.

First I aggregate the number of tuples to reduce the data:

val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")

This works just fine and is fast. Then, I want to perform a self join to generate candidates. I already observed that I need to generate more parallelism:

--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \ 

Therefore, increased default and shuffle parallelism are set. Additionally, I tried to coarsen both discrete values (i.e. increase the number of items which fall into a discrete block) and thereby reduce the number of tuples. Still no luck. So I additionally tried to force a larger number of tasks by repartitioning:

val materializedAggregated="path/to/file/aggregated_stuff"      )
val selfB = materializedAggregated
  .withColumnRenamed("baz", "other_batz")
  .withColumnRenamed("value", "other_value")

val candidates = materializedMultiSTW
  .join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
  .filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))

However this also does not work & is way too slow. What further things can I do to make this query compute faster? Is there something I am missing?

Below you will see various failing attempts trying to increase the parallelism when reading the data for the self join.

I even set:

--conf spark.sql.files.maxPartitionBytes=16777216 \

to 1/8 i.e. 16 vs. 128MB, still the number of tasks which is generated is way too small i.e. only 250.

some details

The execution plan:

Even without this manual repartition it is way too slow, and I fear not enough partitions are created:

Even less tasks are processed - which most likely will make it slower:

How can I make sure that this initial step has a higher parallelism? Could bucketing help? But when reading the shuffled data only once - It would not really yield a speed up - right? What about the repartition step when writing the aggregated files? Should I set a higher number here? So far even when omitting it (and basically recomputing the aggregation twice) - it does not increase beyond 260 tasks.


I use spark 2.3.x on HDP 3.1


  • The maximum number of tasks from your inner join will be equal to the number of join keys (i.e. their cardinality), irrespective of the settings for spark.sql.shuffle.partitions and spark.default.parallelism.

    This is because in the SortMergeJoin, data will be shuffled using the hash of the join key. All data from each distinct join key will go to a single executor.

    The problem therefore is that you do not have enough bins - they're too coarse. The maximum number of tasks you will see will be equal to the number of bins.

    If you bin your data with more granularity, you should see the number of tasks increase.