Search code examples
pythondataframepysparkapache-spark-sqlaggregate

pySpark compute max of multiple columns while ignoring NaN


I'm trying to compute the max (or any agg function) for multiple columns in a pyspark dataframe. However, since these columns have some NaNs, the result for the max aggregator is always NaN. Is there any way to ignore NaN while making this computation?

Here's the code that I'm running:

panel = pd.DataFrame({'col1': [2,3,4,np.nan],
                         'col2': [1,np.nan,4,np.nan],
                         'col3': [2,7,1,np.nan],
                         'col4': [np.nan,3,4,np.nan]})
sq = context.sql
sparkDF = sq.createDataFrame(panel) 

# Compute max
mapp = {c: 'max' for c in panel.columns} # {'col1': 'max', col2': 'max', ...}
vals = panel.agg(mapp).collect()[0]
print(vals)

PS: I can't just drop rows containing a NaN, because other columns might have a valid value. I would also want to avoid having to iterate over every column and computing the max value 'manually', as it would result in a big performance impact.


Solution

  • You can try max with fill.

    from pyspark.sql import functions as F
    sparkDF = sparkDF.na.fill(0).select([F.max(x).alias(f'max_{x}') for x in sparkDF.columns])
    
    sparkDF.show()
    
    # +--------+--------+--------+--------+
    # |max_col1|max_col2|max_col3|max_col4|
    # +--------+--------+--------+--------+
    # |     4.0|     4.0|     7.0|     4.0|
    # +--------+--------+--------+--------+
    

    ====================================================

    Update:

    For calculating average with ignoring NaN, you cannot use the regular avg as this includes NaNs. In order to calculate average by dropping NaN, you can use sum and count with a condition.

    sparkDF = sparkDF.select([
        (F.sum(F.when(~F.isnan(x), F.col(x))) / F.count(F.when(~F.isnan(x), True))).alias(f'avg_{x}')
        for x in sparkDF.columns
    ])