Search code examples
amazon-web-servicespysparkoptimizationaws-glue

Optimizing pyspark code by calculating Dataframe size


I'm using the following function (partly from a code snippet I got from this post: Compute size of Spark dataframe - SizeEstimator gives unexpected results

and adding my calculations according to what I understood will be distributed within the workers based on this tutorial: https://www.youtube.com/watch?v=hvF7tY2-L3U and ended up with the function below:

def get_partitions(df):
    partitions = 1
    df.cache().foreach(lambda x: x)                                                   
    df_size_in_bytes = spark._jsparkSession.sessionState()\
             .executePlan(df._jdf.queryExecution().logical(),\
             df._jdf.queryExecution().mode()).optimizedPlan()\
             .stats()\
             .sizeInBytes()
    kilo_bytes = int(df_size_in_bytes/1024)
    mega_bytes = int(kilo_bytes/1024)
    parts = int(mega_bytes/128)        
    if parts <= 0:
        parts = partitions
    else:
        partitions = parts   
    return partitions 

Though is giving me the following error:

Py4JError: An error occurred while calling o8705.mode. Trace:
py4j.Py4JException: Method mode([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

I'm working with AWS Glue jobs and interactive sessions, and in glue jobs I get an empty trace, and in Glue interactive sessions I get the error above, what is missing in the calculation. I'm currently using Spark 3.1 and Glue 3.0. any help will be appreciated!


Solution

  • The Spark version that Glue 3.0 uses does not accept the "mode" parameter for the executePlan(), introduced at this commit to the Spark core.

    So, you just need to remove the parameter from the method call:

    from math import ceil
    
    def get_partitions(df):
        partitions = 1
        df.cache().foreach(lambda x: x)                                                   
        df_size_in_bytes = spark._jsparkSession.sessionState() \         
            .executePlan(
                df._jdf.queryExecution().logical()
            ).optimizedPlan() \
            .stats() \
            .sizeInBytes()
    
        mega_bytes = df_size_in_bytes/1000**2
        parts = ceil(mega_bytes/128)        
        partitions = 1 if parts == 0 else parts
    
        return partitions