Search code examples
pysparkaggregate-functions

How to aggregate columns dynamically in pyspark


I want to calculate percentage of non-missing value pct_<original_name>_valid for each of the input columns. In this example there are only 2 columns, so it's easy to manually script the code below. But when there are 30+ columns, I don't want to do this manually. Is it even possible to do this dynamically? (for instance, taking a list of column names as an input)

import pyspark.sql.functions as F

d = [{'name': 'Alice', 'age': 1}, {'name': 'Bae', 'age': None}]
df = spark.createDataFrame(d)

df.withColumn('name_valid', F.when(col("name").isNotNull(),1).otherwise(0))\
.withColumn('age_valid', F.when(col("age").isNotNull(),1).otherwise(0))\
.agg(
    (100.0*F.sum(col("name_valid"))/F.count(F.lit(1))).alias("pct_name_valid"),
    (100.0*F.sum(col("age_valid"))/F.count(F.lit(1))).alias("pct_age_valid")
)\
.show()

Here is the result:

+--------------+-------------+
|pct_name_valid|pct_age_valid|
+--------------+-------------+
|         100.0|         50.0|
+--------------+-------------+

As mentioned earlier, I don't want to manually do this for all 30+ columns. Is there any way I can do like:

my_output = calculate_non_missing_percentage(df, my_columns = ["name", "age", "gender", "school", "color"])

Solution

  • How I find the null values dynamically in my code is in this way:

    from pyspark.sql.functions import isnan, when, count
    
    total_count = df.count()
    null_values = df.select(
        [(count(when(isnan(c), c)) / total_count).alias(c) for c in df.columns]
    )
    
    # Another way to do it is (ref neobot)
    null_values = df.select(
        [(sum(when(isnull(c), 1).otherwise(0)) / total_count).alias(c) for c in df.columns]
    )
    

    The trick is in creating the list before hand. You list the functions you want to apply on the columns and then pass the list to select.

    I use this to count distinct values in my data:

    df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns) (given the columns are string columns, didn't put that condition here)