Search code examples
pythonapache-sparkpyspark

Pyspark RAM leakage


My spark codes recently causes ram leakage. For instance, before running any script, when I run top, I can see 251 GB total memory and 230 GB free + used memory.

When I run my spark job through spark-submit, regardless of whether the job is completed or not (ending with exception) the free + used memory is much lower than the start. This is one sample of my code:

    from pyspark.sql import SparkSession

    def read_df(spark, jdbc_url, table_name, jdbc_properties ):
        df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
        return df
    
    
    def write_df(result, table_name, jdbc_properties):
        result = result.repartition(50)
        result.write.format('jdbc').options(
            url=jdbc_properties['jdbc_url'],
            driver="org.postgresql.Driver",
            user=jdbc_properties["user"],
            password=jdbc_properties["password"],
            dbtable=table_name,
            mode="overwrite"
        ).save()


    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .appName("Python Spark SQL basic example") \
            .config("spark.driver.extraClassPath", "postgresql-42.5.2.jar").config("spark.executor.extraClassPath","postgresql-42.5.2.jar") \
            .config("spark.local.dir", "/shared/hm31") \
            .config("spark.master", "local[*]") \
            .getOrCreate()
    
    
    
        spark.sparkContext.setLogLevel("WARN")
    
        parquet_path = '/shared/hossein_hm31/embeddings_parquets'
    
        try:
    
            unique_nodes = read_df(spark, jdbc_url, 'hm31.unique_nodes_cert', jdbc_properties)
            df = spark.read.parquet(parquet_path)
    
    
            unique_nodes.createOrReplaceTempView("unique_nodes")
            df.createOrReplaceTempView("all_embeddings")
    
            sql_query = """
                select u.node_id, a.embedding from unique_nodes u inner join all_embeddings a on u.pmid = a.pmid
            """
            result =  spark.sql(sql_query)
    
      
    
            print("num", result.count())
            result.repartition(10).write.parquet('/shared/parquets_embeddings/')
    
            write_df(result, 'hm31.uncleaned_embeddings_cert', jdbc_properties)
    
    
    
    
            spark.catalog.clearCache()
            unique_nodes.unpersist()
            df.unpersist()
            result.unpersist()
    
            spark.stop()
            exit(0)
    
    
    
    
        except:
            print('Error')
            spark.catalog.clearCache()
            unique_nodes.unpersist()
            df.unpersist()
            spark.stop()
            exit(0)
    
    
        print('Error')
        spark.catalog.clearCache()
        unique_nodes.unpersist()
        df.unpersist()
        spark.stop()
        exit(0)

Where I tried to remove cached data frames. This RAM leakage would need a server restart, which is uncomfortable.

This is the command I run:

spark-submit --master local[50] --driver-class-path ./postgresql-42.5.2.jar --jars ./postgresql-42.5.2.jar --driver-memory 200g --conf "spark.local.dir=./logs" calculate_similarities.py

And this is the top output, that you can see free + used memory is much less than the total, and used to be around 230 before I ran my spark job. The jobs are sorted by memory usage, and you can see there is no memory-intensive job running after the spark ended with an exception.

I shall add that the machine does not have Pyspark itself. It has Java 11, and I just run Pyspark by importing its package.

enter image description here

Thanks

P.S: The unique_nodes is around 0.5 GB on Postgres. The df = spark. read.parquet(parquet_path) reads 38 parquet files, each around 3 GB. After joining, the result is around 8 GB.


Solution

  • There is no "RAM leakage" here. You're mis-interpreting what top is displaying:

    • total is the total amount of memory (no surprises)
    • free is the amount of memory that is unused for any purpose
    • used is what the kernel currently has allocated, e.g. due to requests from applications
    • the sum of free+ used is not total, because there is also buff/cache. This is the amount of memory currently used for "secondary" purposes, especially caching data that is on disk for which the kernel knows it already has an exact copy in memory; as long as there is no memory-pressure by used, the kernel will try to keep it's buff/cache.
    • avail is what is readily available to be used by applications, approximately the sum of free + buff/cache

    Your top screenshot shows large amount of memory allocated to buff/cache, which is probably data that was ready previously and which the kernel keeps around in case it is needed later; there is no "leakage" here, because the kernel will evict these cached memory page if the need by applications arrives. Also notice that the avail number is still around 234gb, which is almost exactly what you expected from free + used - but didn't take into account buff/cache.