Search code examples
apache-sparkpysparkoptimization

Optimize multiple joins with same conditions in PySpark


I have a large dataset A, about 3 billion rows with 40 columns. I want to join dataset A with datasets B (30 million rows) and C (60 million rows) to get 1 or 2 columns from each dataset.

The relationship between dataset A and dataset B is many-to-many. The same goes for A and C.

The join condition is below:

join_conditions_A_B = [
    A.user_id == B.b_user_id,
    A.log_id == B.b_log_id
]

join_conditions_A_C = [
    A.user_id == C.c_user_id,
    A.log_id == C.c_log_id
]

As you see, the join conditions are similar in both datasets. My join logic is below.

import pyspark.sql.functions as F

B = B.select(
    F.col("user_id").alias("b_user_id"),
    F.col("log_id").alias("b_log_id"),
    "column_to_be_added_to_A"
)

C = C.select(
    F.col("user_id").alias("c_user_id"),
    F.col("log_id").alias("c_log_id"),
    "column_to_be_added_to_A_1",
    "column_to_be_added_to_A_2"
)

A = A.join(B, on=join_conditions_A_B, how="left")
A = A.join(C, on=join_conditions_A_C, how="left")

The build time right now is 23 hours but not finished yet.

My question is: Is there any way I can optimize these join operations?

The join condition is quite similar so maybe I can make it perform at the same time so we don't shuffle the data a lot.


Solution

  • 30M or 60M records with just a few relevant columns should still be possible to fit in memory, so you can try a broadcast join:

    A = A.join(broadcast(B), on=join_conditions_A_B, how="left")
    A = A.join(broadcast(C), on=join_conditions_A_C, how="left")