I am using pyspark to join two tables with 100k rows for each (so not skewed join). It takes longer than 30mins even an hour which I think something is wrong here. The code is just regular join
a = b.join(c, b.id == c.id, "inner").drop(c.id)
I did a lot search and try, including:
spark.sql.adaptive.enabled=true
Neither works.
My question is: if both tables (pyspark.sql.dataframe
object) came from using udf, does it matter? This is the only difference compared with common use.
I used the following udf logic to prepare the tables:
def func(row):
id = row.split(",")[0]
f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
return (int(id), f1, f2)
func_udf = udf(func,
StructType([
StructField("id", IntegerType(), True),
StructField("f1", StringType(), True),
StructField("f2", IntegerType(), True)
]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")
df
is the table used for join.
Any troubleshooting idea is appreciated. Thank you.
P.S. table b has 3 columns and table c has 6 columns. So they are not wide. Also, if I shrink size to 10k, the join works as expected.
I figured out. I would like share my experience below:
df.cache()
is helpful.