Search code examples
pythonpysparkcountapache-spark-sqllazy-evaluation

Pyspark - df.cache().count() taking forever to run


I'm trying to force eager evaluation for PySpark, using the count methodology I read online:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

spark_df.cache().count()

However, when I try running the code, the cache count part is taking forever to run. My data size is relatively small (2.7GB, 15 mil rows), but after 28 min of running, I decided to kill the job. For comparison, when I use pandas.read_sql() method to read the data, it took only 6 min 43 seconds.

The machine I'm running the code on is pretty powerful, (20 vCPU, 160 GB RAM, Windows OS). I believe I'm missing a step to speed up the count statement.

Any help or suggestions are appreciated.


Solution

  • When you used pandas to read, it will use as much memory as possible from the available memory of the machine (assuming all 160Gb as you mentioned, which is by far larger than the data itself ~3Gb).

    However, it's not the same with Spark. When you start your Spark session, typically you would have to mention upfront how much memory per executor (and driver, and application manager if applicable) you'd want to use, and if you don't specify it, it's going to be 1Gb according to the latest Spark documentation. So the first thing you want to do is giving more memory to your executors and driver.

    Second, reading from JDBC by Spark is tricky, because slowness or not depends on the number of executors (and tasks), and those numbers depend on how many partitions your RDD (that read from JDBC connection) have, and the numbers of partitions depends on your table, your query, columns, conditions, etc. One way to force changing behavior, to have more partitions, more tasks, more executors, ... is via these configurations: numPartitions, partitionColumn, lowerBound, and upperBound.

    • numPartitions is the number of partitions (hence the number of executors will be used)
    • partitionColumn is an integer type column that Spark would use to target partitioning
    • lowerBound is the min value of partitionColumn that you want to read
    • upperBound is the max value of partitionColumn that you want to read

    You can read more here https://stackoverflow.com/a/41085557/3441510, but the basic idea is, you want to use a reasonable number of executors (defined by numPartitions), to process an equally distributed chunk of data for each executor (defined by partitionColumn, lowerBound and upperBound).