Search code examples
pandasapache-sparkazure-databricksspark-koalas

Pandas to Koalas does not solve spark.rpc.message.maxSize exceeded error


I have an existing databricks job which heavily uses Pandas and below code snippet gives the error "org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 101059:0 was 1449948615 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values"

Current code snippet is

normalized_df = pd.DataFrame(data=normalized_data_remnan, columns=['idle_power_mean', 'total_eng_idle_sec', 'total_veh_idle_sec', 'total_sec_on', 'total_sec_load', 'positive_power_mean', 'time_fraction_eng_idle_pcnt', 'time_fraction_veh_idle_pcnt', 'negative_power_mean', 'mean_eng_idle_sec', 'mean_veh_idle_sec', 'mean_stand_still', 'num_start_stops', 'num_power_jump', 'positive_power_med', 'load_speed_med'])

where normalized_data_remnan is an ndarray outputted by scipty.zscore. I thought moving this to koalas would solve the issue as Koalas uses distributed computing and so converted the code as below. import databricks.koalas as ks normalized_df = pd.DataFrame(data=normalized_data_remnan, columns=['idle_power_mean', 'total_eng_idle_sec', 'total_veh_idle_sec', 'total_sec_on', 'total_sec_load', 'positive_power_mean', 'time_fraction_eng_idle_pcnt', 'time_fraction_veh_idle_pcnt', 'negative_power_mean', 'mean_eng_idle_sec', 'mean_veh_idle_sec', 'mean_stand_still', 'num_start_stops', 'num_power_jump', 'positive_power_med', 'load_speed_med'])

But even after this conversion, I am getting the same error. Do you have any clue for this error? I can think of changing this spark.rpc.message.maxSize to 2 GB. What's the maximum value of this parameter? My driver node is 128 GB memory, 6 cores and worker is 64GB,32 cores and total 8 workers

Thanks, Nikesh


Solution

  • Usually, sending some huge items from the driver to executor's results in this error message.

    spark.rpc.message.maxSize : Is the largest message (in MiB) that can be delivered in "control plane" communication. If you are getting alerts about the RPC message size, increase this. Its default value is 128.

    Setting this property(spark.rpc.message.maxSize) in Spark configuration when you start the cluster, you might be able to resolve this error.

    To lower the size of the Spark RPC message, you can break the huge list into numerous smaller ones by increasing the partition number.

    Example:

    largeList = [...] # This is a large list  
    partitionNum = 100  # Increase this number if necessary  
    rdd = sc.parallelize(largeList, partitionNum)  
    ds = rdd.toDS()