Let's assume I have two Spark data frames:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Example data for DataFrame 1
data1 = [
("Pool_A", "A", "X", 10),
("Pool_A", "A", "Y", 20),
("Pool_A", "B", "X", 15),
("Pool_B", "A", "X", 5),
("Pool_B", "B", "Y", 25),
]
# Define the schema for DataFrame 1
df1_schema = ["pool", "col1", "col2", "value"]
# Create DataFrame 1
df1 = spark.createDataFrame(data1, df1_schema)
# Example data for DataFrame 2
data2 = [
("A", "X", 100),
("A", "Y", 200),
("B", "X", 150),
("B", "Y", 250),
("C", "X", 300),
]
# Define the schema for DataFrame 2
df2_schema = ["col1", "col2", "default_value"]
# Create DataFrame 2
df2 = spark.createDataFrame(data2, df2_schema)
I want to join the two dataframes by propagating all possible combinations of "col1", "col2" for each "pool" and have the default "value" associated with it. I have a solution using a crossJoin
, but wanted to see if there other elegant solutions (+cost of performance of using the crossJoin
)
This is the desired output:
+-------+----+----+-----+
| pool|col1|col2|value|
+-------+----+----+-----+
| Pool_B| A| X| 5|
| Pool_B| B| Y| 25|
| Pool_B| C| X| 300|
| Pool_B| B| X| 150|
| Pool_B| A| Y| 200|
| Pool_A| A| X| 10|
| Pool_A| B| X| 15|
| Pool_A| A| Y| 20|
| Pool_A| B| Y| 250|
| Pool_A| C| X| 300|
+-------+----+----+-----+
In big data distributed computing, there's really no other way apart from crossJoin
to get all the combinations of two different dataframes. But before that, you will want to make a small dataframe with only "pools".
After the crossJoin
we can join
values from df1 and fill in gaps (nulls) with default values using coalesce
.
from pyspark.sql import functions as F
df_pools = df1.select('pool').distinct()
df_comb = df_pools.crossJoin(df2)
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
'pool', 'col1', 'col2',
F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# | pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B| A| Y| 200|
# |Pool_A| A| X| 10|
# |Pool_B| A| X| 5|
# |Pool_A| A| Y| 20|
# |Pool_A| B| Y| 250|
# |Pool_B| B| X| 150|
# |Pool_A| B| X| 15|
# |Pool_A| C| X| 300|
# |Pool_B| B| Y| 25|
# |Pool_B| C| X| 300|
# +------+----+----+-----+
That being said, if you are sure that the number of values in the "pool" column is not too big, you can extract the values from the dataframe as a list (into the driver) and send the list to executors.
pools = [x[0] for x in df1.select('pool').distinct().collect()]
df_comb = df2.withColumn('pool', F.explode(F.array(*[F.lit(x) for x in pools])))
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
'pool', 'col1', 'col2',
F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# | pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B| A| Y| 200|
# |Pool_A| A| X| 10|
# |Pool_B| A| X| 5|
# |Pool_A| A| Y| 20|
# |Pool_A| B| Y| 250|
# |Pool_B| B| X| 150|
# |Pool_A| B| X| 15|
# |Pool_A| C| X| 300|
# |Pool_B| B| Y| 25|
# |Pool_B| C| X| 300|
# +------+----+----+-----+
Note: in Spark 3.4+, instead of F.array(*[F.lit(x) for x in pools])
you can use F.lit(pools)
Such approach would avoid crossJoin
.
Query plan using crossJoin
:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#899, col1#907, col2#908, coalesce(value#921L, default_value#909L) AS value#927L]
+- SortMergeJoin [pool#899, col1#907, col2#908], [pool#918, col1#919, col2#920], LeftOuter
:- Sort [pool#899 ASC NULLS FIRST, col1#907 ASC NULLS FIRST, col2#908 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(pool#899, col1#907, col2#908, 200), ENSURE_REQUIREMENTS, [plan_id=4569]
: +- CartesianProduct
: :- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
: : +- Exchange hashpartitioning(pool#899, 200), ENSURE_REQUIREMENTS, [plan_id=4564]
: : +- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
: : +- Project [pool#899]
: : +- Scan ExistingRDD[pool#899,col1#900,col2#901,value#902L]
: +- Scan ExistingRDD[col1#907,col2#908,default_value#909L]
+- Sort [pool#918 ASC NULLS FIRST, col1#919 ASC NULLS FIRST, col2#920 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(pool#918, col1#919, col2#920, 200), ENSURE_REQUIREMENTS, [plan_id=4570]
+- Filter ((isnotnull(pool#918) AND isnotnull(col1#919)) AND isnotnull(col2#920))
+- Scan ExistingRDD[pool#918,col1#919,col2#920,value#921L]
Query plan without crossJoin
(i.e. sending the list to executors):
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#1379, col1#1371, col2#1372, coalesce(value#1366L, default_value#1373L) AS value#1389L]
+- SortMergeJoin [pool#1379, col1#1371, col2#1372], [pool#1363, col1#1364, col2#1365], LeftOuter
:- Sort [pool#1379 ASC NULLS FIRST, col1#1371 ASC NULLS FIRST, col2#1372 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(pool#1379, col1#1371, col2#1372, 200), ENSURE_REQUIREMENTS, [plan_id=6619]
: +- Generate explode([Pool_A,Pool_B]), [col1#1371, col2#1372, default_value#1373L], false, [pool#1379]
: +- Scan ExistingRDD[col1#1371,col2#1372,default_value#1373L]
+- Sort [pool#1363 ASC NULLS FIRST, col1#1364 ASC NULLS FIRST, col2#1365 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(pool#1363, col1#1364, col2#1365, 200), ENSURE_REQUIREMENTS, [plan_id=6620]
+- Filter ((isnotnull(pool#1363) AND isnotnull(col1#1364)) AND isnotnull(col2#1365))
+- Scan ExistingRDD[pool#1363,col1#1364,col2#1365,value#1366L]