Search code examples
apache-sparkpysparkdatabricks

Big differences in join time on similar tables


I have two tables described below. I generally join this tables (inner join) with smaller another tables. However, upon checking the execution time, Table 2 takes approximately 9 minutes, whereas Table 1 takes about 2 hours. Additionally, I noticed that there is no ColumnarToRow node and 2nd WholeStageCodegen node in the execution summary for Table 1 joins. I use broadcast join both.

Table 1: Size 8TB Column count 151

Table 2: Size 5.5TB Column count 90

Join columns are same and both table has liquid clustered on same column. Both are optimized.

Join with Table1 execution summary

Completed in 6292595ms
Node(row)
*WholeStageCodegen(1)
-*(5):ColumnarToRow(253,461)
-*(6):Filter(253,461)
(1):Scan parquet **table1** (10,305,644,145)
(2):Filter(9,622,877,502)
(3):Project
(4):Scan parquet **smaller_table** (253,461)
(7):Exchange
(9):BroadcastHashJoin(28,206,425)
(10):Project
(11):WriteFiles
(12):ExecuteWriteIntoDeltaCommand
(13):ResultQueryStage
(23):AdaptiveSparkPlan

Join with Table2 execution summary

Completed in 567963 ms
Node (row)
*WholeStageCodegen(1)
-*(6):ColumnarToRow (253,461)
-*(7):Filter (253,461)
*WholeStageCodegen(2)
-*(2):ColumnarToRow (19,364,719,597)
-*(3):Filter (17,548,253,303)
-*(4):Project
-*(10):BroadcastHashJoin (65,712,508)
-*(11):Project
(1):Scan parquet **table2** (19,364,719,597)
(5):Scan parquet **smaller_table**  (253,461)
(8):Exchange
(12):WriteFiles
(13):ExecuteWriteIntoDeltaCommand
(14):ResultQueryStage
(24):AdaptiveSparkPlan

Thanks


Solution

  • I couldn't find it in the documentation or other resources, but it seems Spark considers the column count to trigger ColumnarToRow and WholeStageCodegen optimizations. When I select only ~90 columns in Table 1, it runs as fast as Table 2. Thanks for comments.