Search code examples
apache-sparkapache-spark-sqlpalantir-foundryfoundry-code-repositoriesfoundry-code-workbooks

How do I make my highly skewed join complete in Spark SQL?


My join is executing as follows:

SELECT
  left.*, 
  right.*
FROM `/foo/bar/baz` AS left
JOIN `/foo2/bar2/baz2` AS right
ON left.something = right.something

Dataset: /foo/bar/baz

+-----------+-------+
| something | val_1 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| a         |     3 |
| a         |     4 |
| a         |     5 |
| a         |     6 |
| a         |   ... |
| a         |   10K |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

Dataset: /foo2/bar2/baz2

+-----------+-------+
| something | val_2 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

I am getting OOM errors on my executors and I don't want to throw more memory at the executors unnecessarily. How do I ensure this join executes successfully without burning extra resources?


Solution

  • Salting the Join

    One tactic to getting this join to execute successfully is to do what's known as salting the join.

    Salted joins operate in Spark by splitting the table with many entries per key into smaller portions while exploding the smaller table into an equivalent number of copies. This results in the same-sized output as a normal join, but with smaller task sizes for the larger table thus a decreased risk of OOM errors. You salt a join by adding a column of random numbers 0 through N to the left table and making N copies of the right table. If you add your new random column to the join, you reduce the largest bucket to 1/N of its previous size.

    The secret is the EXPLODE function. EXPLODE is a cross-product:

    SELECT
      left.*, 
      right.*
    FROM
      (
        SELECT 
          *, 
          FLOOR(RAND() * 8) AS salt 
          FROM `/foo/bar/baz`
      ) AS left
    JOIN
      (
        SELECT 
          *, 
          EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt 
          FROM `/foo2/bar2/baz2`
      ) AS right
    ON 
    left.something = right.something 
    AND left.salt = right.salt
    

    Tuning

    • How do you choose the factor to explode by? Educated guessing, mostly. Powers of 2 are a good way to find the right ballpark: 8, 16, 32.
    • A similar approach is to look at the row count per executor as your unsalted job is running.

    What to Watch Out For

    • Make sure you don't make off-by-one errors when salting a join. That will make you lose a fraction of your records.
    • CEIL(RAND() * N) gives you integers between 0 and N. FLOOR(RAND() * N) gives you numbers between 0 and N — 1. Make sure you explode the correct set of numbers in your salted join!

    The Overhead of Salt

    • Salting a join does not necessarily make your build faster. It just gives it a better chance of succeeding.
    • If you salt your joins unnecessarily, you may actually start seeing declines in performance.