Search code examples
python-3.xapache-sparkpysparkapache-spark-sqlpercentile

Groupby and percentage distributions pyspark equivalent of given pandas code


Whenever I want to get distributions in pandas for my entire dataset I just run the following basic code:

x.groupby('y').describe(percentiles=[.1, .25, .5, .75, .9, 1])

where I get the distribution values for every custom percentage I want. I want to do the exact same thing in pyspark. However, from what I have read the describe function in pyspark does not allow to specify percentages and the summary function in pyspark only allows for standard values of 0.25, 0.50, 0.75 so I can't customize to the percentiles I would like.

How do I do the equivalent of the pandas code below but in pyspark?


Solution

  • You can use percentile_approx on all column names you need (note that we drop the column we are performing the groupby on):

    all_aggregations = []
    for col in sparkDF.drop('y').columns:
        all_aggregations.extend(
            [F.percentile_approx(col, 0.1).alias(f'{col}_perc_10'),
            F.percentile_approx(col, 0.25).alias(f'{col}_perc_25'),
            F.percentile_approx(col, 0.50).alias(f'{col}_perc_50'),
            F.percentile_approx(col, 0.75).alias(f'{col}_perc_75'),
            F.percentile_approx(col, 0.90).alias(f'{col}_perc_90'),
            F.max(col).alias(f'{col}_max')]
        )
    
    sparkDF_summary_stats = sparkDF.groupby('y').agg(
        *all_aggregations
    )
    

    For anyone using an earlier version of Pyspark, you can calculate percentiles using F.expr (credit goes this answer by @Ala Tarighati):

    all_aggregations = []
    for col in sparkDF.drop('y').columns:
        all_aggregations.extend(
            [F.expr(f'percentile({col}, array(0.10))').alias(f'{col}_perc_10'),
            F.expr(f'percentile({col}, array(0.25))').alias(f'{col}_perc_25'),
            F.expr(f'percentile({col}, array(0.50))').alias(f'{col}_perc_50'),
            F.expr(f'percentile({col}, array(0.75))').alias(f'{col}_perc_75'),
            F.expr(f'percentile({col}, array(0.90))').alias(f'{col}_perc_90'),
            F.max(col).alias(f'{col}_max')]
        )
    
    sparkDF_summary_stats = sparkDF.groupby('y').agg(
            *all_aggregations
        )
    

    Using a random sample pyspark dataframe:

    np.random.seed(42)
    random_cols = np.random.choice(sparkDF_summary_stats.columns, 4).tolist()
    sparkDF_summary_stats.select(
        random_cols
    ).show()
    
    +------------------+------------------+-------------------+-------------------+
    |         col60_max|    col100_perc_75|      col37_perc_25|      col68_perc_50|
    +------------------+------------------+-------------------+-------------------+
    |0.9888405413036631|0.7153223105291356| 0.3924451074226354|0.23228965409645264|
    |0.9546663568790689|0.7837917844853972|0.26496706155544303| 0.4975660833887259|
    |0.9969494174116696|0.6553831994634532|0.31725917435686757|0.43747492202372906|
    |0.9919627472386433|0.7804711383801549|0.32662190574800876| 0.3862363952990896|
    +------------------+------------------+-------------------+-------------------+