Search code examples
apache-sparkjoinpysparkdata-manipulationcross-join

crossJoin two Spark dataframes not using crossJoin


Let's assume I have two Spark data frames:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Example data for DataFrame 1
data1 = [
    ("Pool_A", "A", "X", 10),
    ("Pool_A", "A", "Y", 20),
    ("Pool_A", "B", "X", 15),
    ("Pool_B", "A", "X", 5),
    ("Pool_B", "B", "Y", 25),
]

# Define the schema for DataFrame 1
df1_schema = ["pool", "col1", "col2", "value"]

# Create DataFrame 1
df1 = spark.createDataFrame(data1, df1_schema)

# Example data for DataFrame 2
data2 = [
    ("A", "X", 100),
    ("A", "Y", 200),
    ("B", "X", 150),
    ("B", "Y", 250),
    ("C", "X", 300),
]

# Define the schema for DataFrame 2
df2_schema = ["col1", "col2", "default_value"]

# Create DataFrame 2
df2 = spark.createDataFrame(data2, df2_schema)

I want to join the two dataframes by propagating all possible combinations of "col1", "col2" for each "pool" and have the default "value" associated with it. I have a solution using a crossJoin, but wanted to see if there other elegant solutions (+cost of performance of using the crossJoin)

This is the desired output:

+-------+----+----+-----+
|   pool|col1|col2|value|
+-------+----+----+-----+
| Pool_B|   A|   X|    5|
| Pool_B|   B|   Y|   25|
| Pool_B|   C|   X|  300|
| Pool_B|   B|   X|  150|
| Pool_B|   A|   Y|  200|
| Pool_A|   A|   X|   10|
| Pool_A|   B|   X|   15|
| Pool_A|   A|   Y|   20|
| Pool_A|   B|   Y|  250|
| Pool_A|   C|   X|  300|
+-------+----+----+-----+

Solution

  • In big data distributed computing, there's really no other way apart from crossJoin to get all the combinations of two different dataframes. But before that, you will want to make a small dataframe with only "pools".

    After the crossJoin we can join values from df1 and fill in gaps (nulls) with default values using coalesce.

    from pyspark.sql import functions as F
    
    df_pools = df1.select('pool').distinct()
    df_comb = df_pools.crossJoin(df2)
    df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
    df_coalesced = df_joined.select(
        'pool', 'col1', 'col2',
        F.coalesce('value', 'default_value').alias('value')
    )
    df_coalesced.show()
    # +------+----+----+-----+
    # |  pool|col1|col2|value|
    # +------+----+----+-----+
    # |Pool_B|   A|   Y|  200|
    # |Pool_A|   A|   X|   10|
    # |Pool_B|   A|   X|    5|
    # |Pool_A|   A|   Y|   20|
    # |Pool_A|   B|   Y|  250|
    # |Pool_B|   B|   X|  150|
    # |Pool_A|   B|   X|   15|
    # |Pool_A|   C|   X|  300|
    # |Pool_B|   B|   Y|   25|
    # |Pool_B|   C|   X|  300|
    # +------+----+----+-----+
    

    That being said, if you are sure that the number of values in the "pool" column is not too big, you can extract the values from the dataframe as a list (into the driver) and send the list to executors.

    pools = [x[0] for x in df1.select('pool').distinct().collect()]
    df_comb = df2.withColumn('pool', F.explode(F.array(*[F.lit(x) for x in pools])))
    df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
    df_coalesced = df_joined.select(
        'pool', 'col1', 'col2',
        F.coalesce('value', 'default_value').alias('value')
    )
    df_coalesced.show()
    # +------+----+----+-----+
    # |  pool|col1|col2|value|
    # +------+----+----+-----+
    # |Pool_B|   A|   Y|  200|
    # |Pool_A|   A|   X|   10|
    # |Pool_B|   A|   X|    5|
    # |Pool_A|   A|   Y|   20|
    # |Pool_A|   B|   Y|  250|
    # |Pool_B|   B|   X|  150|
    # |Pool_A|   B|   X|   15|
    # |Pool_A|   C|   X|  300|
    # |Pool_B|   B|   Y|   25|
    # |Pool_B|   C|   X|  300|
    # +------+----+----+-----+
    

    Note: in Spark 3.4+, instead of F.array(*[F.lit(x) for x in pools]) you can use F.lit(pools)

    Such approach would avoid crossJoin.

    Query plan using crossJoin:

    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- Project [pool#899, col1#907, col2#908, coalesce(value#921L, default_value#909L) AS value#927L]
       +- SortMergeJoin [pool#899, col1#907, col2#908], [pool#918, col1#919, col2#920], LeftOuter
          :- Sort [pool#899 ASC NULLS FIRST, col1#907 ASC NULLS FIRST, col2#908 ASC NULLS FIRST], false, 0
          :  +- Exchange hashpartitioning(pool#899, col1#907, col2#908, 200), ENSURE_REQUIREMENTS, [plan_id=4569]
          :     +- CartesianProduct
          :        :- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
          :        :  +- Exchange hashpartitioning(pool#899, 200), ENSURE_REQUIREMENTS, [plan_id=4564]
          :        :     +- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
          :        :        +- Project [pool#899]
          :        :           +- Scan ExistingRDD[pool#899,col1#900,col2#901,value#902L]
          :        +- Scan ExistingRDD[col1#907,col2#908,default_value#909L]
          +- Sort [pool#918 ASC NULLS FIRST, col1#919 ASC NULLS FIRST, col2#920 ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(pool#918, col1#919, col2#920, 200), ENSURE_REQUIREMENTS, [plan_id=4570]
                +- Filter ((isnotnull(pool#918) AND isnotnull(col1#919)) AND isnotnull(col2#920))
                   +- Scan ExistingRDD[pool#918,col1#919,col2#920,value#921L]
    

    Query plan without crossJoin (i.e. sending the list to executors):

    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- Project [pool#1379, col1#1371, col2#1372, coalesce(value#1366L, default_value#1373L) AS value#1389L]
       +- SortMergeJoin [pool#1379, col1#1371, col2#1372], [pool#1363, col1#1364, col2#1365], LeftOuter
          :- Sort [pool#1379 ASC NULLS FIRST, col1#1371 ASC NULLS FIRST, col2#1372 ASC NULLS FIRST], false, 0
          :  +- Exchange hashpartitioning(pool#1379, col1#1371, col2#1372, 200), ENSURE_REQUIREMENTS, [plan_id=6619]
          :     +- Generate explode([Pool_A,Pool_B]), [col1#1371, col2#1372, default_value#1373L], false, [pool#1379]
          :        +- Scan ExistingRDD[col1#1371,col2#1372,default_value#1373L]
          +- Sort [pool#1363 ASC NULLS FIRST, col1#1364 ASC NULLS FIRST, col2#1365 ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(pool#1363, col1#1364, col2#1365, 200), ENSURE_REQUIREMENTS, [plan_id=6620]
                +- Filter ((isnotnull(pool#1363) AND isnotnull(col1#1364)) AND isnotnull(col2#1365))
                   +- Scan ExistingRDD[pool#1363,col1#1364,col2#1365,value#1366L]