Search code examples
apache-sparkapache-iceberg

Why is Spark SQL running extremely slow?


I'm using Spark SQL to perform a simple query from my Iceberg table. Some info about the table itself because that might be useful (state from the moment of posting this question):

  • It has 7600 rows and 115 columns
  • It has 28 partitions, partitioned hourly by column "time"
  • It has 948 snapshots

Code:

Long start = System.nanoTime();
SparkSession spark = getSparkSession();
System.out.println("Session creation took (ms): " + (System.nanoTime() - start) / 1000000);
        
Dataset<Row> data = spark.sql("SELECT * FROM myschema.mytable WHERE time BETWEEN CAST('2024-07-10 14:00:00.0' AS TIMESTAMP) AND CAST('2024-07-10 15:40:00.0' AS TIMESTAMP)");
        
System.out.println("Count: " + data.count());
System.out.println("Partitions: " + data.rdd().getNumPartitions());
System.out.println("Execution took (ms): " + (System.nanoTime() - start) / 1000000);

spark.stop();

Output:

Session creation took (ms): 4333

Count: 0

Partitions: 143

Execution took (ms): 107029

Important note: This number of partitions is going up when I load more and more data into my source table. If I perform the same query after some time count will remain 0, but number of partitions will be higher.

Two big questions are:

1. Why is this simple query so slow (~100 seconds) even when I deliberately extract 0 rows from source table by pointing to timestamps from the future? When I perform this query via Trino is takes 1-2 seconds. Also when I set proper timestamps and extract e.g. 500 rows it makes no difference, still runs ~100 seconds.

2. What is this number of partitions and why it keeps increasing? Why is it e.g. 143 if table has 28 partitions?


UPDATE (10.07.2024):

When I change my query to simple "SELECT * FROM myschema.mytable" it executes like 10-12x times faster.


Solution

  • Turned out Spark had a problem with my "SELECT..." statement for some reason, while I focused mostly on fixing partitioning.

    I've formatted my times like this "yyyy-MM-dd HH:mm:ss" and replaced "spark.sql(...)" with:

    spark.read().format("iceberg").load(getSourceTable()).filter(col("time").between(startTime, endTime));
    
    

    After this, everything works as expected.