Search code examples
pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

Why is my build hanging / taking a long time to generate my query plan with many unions?


I notice when I run the same code as my example over here but with a union or unionByName or unionAll instead of the join, my query planning takes significantly longer and can result in a driver OOM.

Code included here for reference, with a slight difference to what occurs inside the for() loop.

from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  df = df.unionByName(
    stats.select(
      "*",
      F.lit(filter_col).alias("found_filter")
    )
  )

df.show()

"""
+-----+-----+---------+---------+------------+                                  
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
+-----+-----+---------+---------+------------+
"""

df.explain()

# REALLY long query plan.....

"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
:  +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
:     :     +- Union
:     :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:           +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
:  +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
:     :     +- Union
:     :        :- *(7) Filter (col_1#1800 < 1)
:     :        :  +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(8) Filter (col_1#1800 < 1)
:     :           +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:           +- *(10) Filter (col_1#1808 < 1)
:              +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
:  +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
:     :     +- Union
:     :        :- *(13) Filter (measure_1#1802 < 1.0)
:     :        :  +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(14) Filter (measure_1#1802 < 1.0)
:     :           +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
:  +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
:     :     +- Union
:     :        :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :        :  +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :           +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
:  +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
:     :     +- Union
:     :        :- *(25) Filter (col_2#1801 < 1)
:     :        :  +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(26) Filter (col_2#1801 < 1)
:     :           +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
:  +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
:     :     +- Union
:     :        :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :        :  +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :           +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
:  +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
:     :     +- Union
:     :        :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :        :  +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :           +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
:  +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
:     :     +- Union
:     :        :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :        :  +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :           +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
:  +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
:     :     +- Union
:     :        :- *(49) Filter (measure_2#1803 < 1.0)
:     :        :  +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(50) Filter (measure_2#1803 < 1.0)
:     :           +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
:  +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
:     :     +- Union
:     :        :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
:  +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
:     :     +- Union
:     :        :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :        :  +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :           +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
:  +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
:     :     +- Union
:     :        :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :           +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
:  +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
:     :     +- Union
:     :        :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
:  +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
:     :     +- Union
:     :        :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
:  +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
:     :     +- Union
:     :        :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
   +- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
      :- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
      :     +- Union
      :        :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :        :  +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      :        +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :           +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      +- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
         +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""

I'm seeing a significantly longer query plan here, and especially as the number of iterations of the for() loop increases, the performance degrades terribly.

How can I improve my performance?


Solution

  • This is a known limitation of iterative algorithms in Spark. At the moment, every iteration of the loop causes the inner nodes to be re-evaluated and stacked upon the outer df variable.

    This means your query planning process is taking O(exp(n)) where n is the number of iterations of your loop.

    There's a tool in Palantir Foundry called Transforms Verbs that can help with this.

    Simply import transforms.verbs.dataframes.union_many and call it upon the total set of dataframes you wish to materialize (assuming your logic will allow for it, i.e. one iteration of the loop doesn't depend upon the result of a prior iteration of the loop.

    The code above should instead be modified to:

    from pyspark.sql import types as T, functions as F, SparkSession
    from transforms.verbs.dataframes import union_many
    
    spark = SparkSession.builder.getOrCreate()
    
    schema = T.StructType([
      T.StructField("col_1", T.IntegerType(), False),
      T.StructField("col_2", T.IntegerType(), False),
      T.StructField("measure_1", T.FloatType(), False),
      T.StructField("measure_2", T.FloatType(), False),
    ])
    data = [
      {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
      {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
    ]
    
    df = spark.createDataFrame(data, schema)
    
    right_schema = T.StructType([
      T.StructField("col_1", T.IntegerType(), False)
    ])
    right_data = [
      {"col_1": 1},
      {"col_1": 1},
      {"col_1": 2},
      {"col_1": 2}
    ]
    right_df = spark.createDataFrame(right_data, right_schema)
    
    df = df.unionByName(df)
    df = df.join(right_df, on="col_1")
    df.show()
    
    """
    +-----+-----+---------+---------+
    |col_1|col_2|measure_1|measure_2|
    +-----+-----+---------+---------+
    |    1|    2|      0.5|      1.5|
    |    1|    2|      0.5|      1.5|
    |    1|    2|      0.5|      1.5|
    |    1|    2|      0.5|      1.5|
    |    2|    3|      2.5|      3.5|
    |    2|    3|      2.5|      3.5|
    |    2|    3|      2.5|      3.5|
    |    2|    3|      2.5|      3.5|
    +-----+-----+---------+---------+
    """
    
    df.explain()
    
    """
    == Physical Plan ==
    *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
    +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
       :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
       :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
       :     +- Union
       :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
       :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
       +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
             +- *(4) Scan ExistingRDD[col_1#1808]
    """
    
    filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
    df = df.withColumn("found_filter", F.lit(None))
    union_dfs = []
    for filter_col in filter_union_cols:
      stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
      union_df = stats.select(
        "*",
        F.lit(filter_col).alias("found_filter")
      )
      union_dfs += [union_df]
    
    df = df.unionByName(
      union_many(union_dfs)
    )
    

    This will optimize your unions and take significantly less time.

    The bottom line: beware of using any union calls inside for/while loops. If you must use this behavior, use the transforms.verbs.dataframes.union_many verb to optimize your final set of DataFrames

    Check out your platform documentation for more information and more helpful Verbs.

    Protip: Use the included optimization over here to further increase your performance