Search code examples
pysparkapache-spark-sqlquery-performance

How improve performance when join pyspark Dataframes


I have 2 pyspark Dataframess, the first one contain ~500.000 rows and the second contain ~300.000 rows. I did 2 join, in the second join will take cell by cell from the second dataframe (300.000 rows) and compare it with all the cells in the first dataframe (500.000 rows).

So, there's is very slow join. I broadcasted the dataframes before join.

Test 1:

df_join = df1.join(F.broadcast(df2), df1.String.contains(df2["search.subString"]), "left") 

The job took many hours to finish.

Test 2:

df_join = F.broadcast(df1).join(F.broadcast(df2), df1.String.contains(df2["search.subString"]), "left")

The running is very slow than the first code above, so the performance is very bad.

I tried to cache the dataframes before join.

I used:

df.cache() for each dataframe. But, the performance always not good.

I tried to use persist in memory_only:

df.persist(MEMORY_ONLY) ==> NameError: global name 'MEMORY_ONLY' is not defined
df.persist(StorageLevel.MEMORY_ONLY) ==> NameError: global name 'StorageLevel' is not defined

How can I persist the Dataframe in memory ?

Can you please suggest me a solution to improve the performance ?

Thanks in advance.


Solution

  • Use

    df=df.cache()

    print(df.count())

    Basically, you need to call an action to get the effect of caching.