Search code examples
pythondataframecachingpysparkpersist

How to: Pyspark dataframe persist usage and reading-back


I'm quite new to pyspark, and I'm having the following error:

Py4JJavaError: An error occurred while calling o517.showString. and I've read that is due to a lack of memory:
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

So, I've been reading that a turn-around to this situation is to use df.persist() and then read again the persisted df, so I would like to know:

  • Given a for loop in which I do some .join operations, should I use the .persist() inside the loop or at the end of it? e.g.
    for col in columns:
       df_AA = df_AA.join(df_B, df_AA[col] == 'some_value', 'outer').persist()
    
    --> or <--
    
    for col in columns:
       df_AA = df_AA.join(df_B, df_AA[col] == 'some_value', 'outer')
    df_AA.persist()
    
  • Once I've done that, how should I read back? df_AA.unpersist()? sqlContext.read.some_thing(df_AA)?

I'm really new to this, so please, try to explain as best as you can.

I'm running on a local machine (8GB ram), using jupyter-notebooks(anaconda); windows 7; java 8; python 3.7.1; pyspark v2.4.3


Solution

  • Spark is lazy evaluated framework so, none of the transformations e.g: join are called until you call an action.

    So go ahead with what you have done

    from pyspark import StorageLevel
        for col in columns:
           df_AA = df_AA.join(df_B, df_AA[col] == 'some_value', 'outer')
        df_AA.persist(StorageLevel.MEMORY_AND_DISK)
        df_AA.show()
    

    There multiple persist options available so choosing the MEMORY_AND_DISK will spill the data that cannot be handled in memory into DISK.

    Also GC errors could be a result of lesser DRIVER memory provided for the Spark Application to run.