Search code examples
apache-sparkoptimizationpysparkdatabricksdelta-lake

How to optimize the PySpark Code to get the better performance


I am trying to fetch when the table (Delta table) was last optimized using below code and the getting the output as expected. This code will for all the tables which are present in the database.

table_name_or_path = "abcd"

df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc())
if len(df.take(1)) != 0:
    last_optimize = df.select(col("timestamp").cast("string").alias("timestamp")).first().asDict()
    print(last_optimize["timestamp"])
    last_optimize = last_optimize["timestamp"]
else:
    last_optimize = ""

The above code will take some time and it will trigger lots of spark jobs.

I want to optimize the above code to get better performance.

Is there any way to write the optimized code and that will be more helpful.


Solution

  • It's better to avoid checks like if len(df.take(1)) != 0, because it may lead to recalculation of results when you do .first() later. Instead, just limit number of rows using .limit(1), and check result of the collect item. Something like this (not tested):

    table_name_or_path = "abcd"
    
    df = spark.sql(f"desc history {table_name_or_path}") \
      .select("operation","timestamp") \
      .filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()) \
      .limit(1)
    
    data = df.collect()
    if len(data) > 0:
        last_optimize = data[0].asDict()
        print(last_optimize["timestamp"])
        last_optimize = last_optimize["timestamp"]
    else:
        last_optimize = ""