I'm using the following function (partly from a code snippet I got from this post: Compute size of Spark dataframe - SizeEstimator gives unexpected results
and adding my calculations according to what I understood will be distributed within the workers based on this tutorial: https://www.youtube.com/watch?v=hvF7tY2-L3U and ended up with the function below:
def get_partitions(df):
partitions = 1
df.cache().foreach(lambda x: x)
df_size_in_bytes = spark._jsparkSession.sessionState()\
.executePlan(df._jdf.queryExecution().logical(),\
df._jdf.queryExecution().mode()).optimizedPlan()\
.stats()\
.sizeInBytes()
kilo_bytes = int(df_size_in_bytes/1024)
mega_bytes = int(kilo_bytes/1024)
parts = int(mega_bytes/128)
if parts <= 0:
parts = partitions
else:
partitions = parts
return partitions
Though is giving me the following error:
Py4JError: An error occurred while calling o8705.mode. Trace:
py4j.Py4JException: Method mode([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
I'm working with AWS Glue jobs and interactive sessions, and in glue jobs I get an empty trace, and in Glue interactive sessions I get the error above, what is missing in the calculation. I'm currently using Spark 3.1 and Glue 3.0. any help will be appreciated!
The Spark version that Glue 3.0 uses does not accept the "mode" parameter for the executePlan(), introduced at this commit to the Spark core.
So, you just need to remove the parameter from the method call:
from math import ceil
def get_partitions(df):
partitions = 1
df.cache().foreach(lambda x: x)
df_size_in_bytes = spark._jsparkSession.sessionState() \
.executePlan(
df._jdf.queryExecution().logical()
).optimizedPlan() \
.stats() \
.sizeInBytes()
mega_bytes = df_size_in_bytes/1000**2
parts = ceil(mega_bytes/128)
partitions = 1 if parts == 0 else parts
return partitions