Search code examples
pythondataframepyspark

PySpark - How to apply multiple functions to every column in a dataframe


I want to perform multiple functions e.g. min() and max() on multiple columns of a dataframe.

Something like this, but it doesn't seem to work.

exprs = [min(c).alias(c), max(c).alias(c) for c in df.columns]
df2 = df.agg(*exprs)

I would like the above code to return something like this, first row would be the min for each column and second row would be the max for each column

+-------+-------+-------+-------+
| col 1 | col 2 | col 3 | col 4 |
+-------+-------+-------+-------+
| 123   | 123   | 123   | 123   |
| 123   | 123   | 123   | 123   |
+-------+-------+-------+-------+

Solution

  • You can select the min/max aggregations, cache and then stack them.

    import operator
    from pyspark.sql import functions as F
    
    _data = [
        (4, 123, 18, 29),
        (8, 5, 26, 187),
        (2, 97, 18, 29),
    
    ]
    _schema = ['col_1', 'col2', 'col3', 'col_4']
    df = spark.createDataFrame(_data, _schema)
    min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
    max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]
    
    df2 = df.select(min_vals + max_vals)
    df2.cache()
    
    min_cols = operator.add(
        [F.lit('min').alias('agg_type')],
        [F.col(f'min_{c}').alias(c) for c in df.columns]
    )
    max_cols = operator.add(
        [F.lit('max').alias('agg_type')],
        [F.col(f'max_{c}').alias(c) for c in df.columns]
    )
    min_df = df2.select(min_cols)
    max_df = df2.select(max_cols)
    
    result = min_df.unionByName(max_df)
    result.show()
    
    # +--------+-----+----+----+-----+
    # |agg_type|col_1|col2|col3|col_4|
    # +--------+-----+----+----+-----+
    # |     min|    2|   5|  18|   29|
    # |     max|    8| 123|  26|  187|
    # +--------+-----+----+----+-----+