Search code examples
pysparkpercentile

Percentile.INC in Pyspark


I need to replicate Percentile.INC functionality of Excel with Pyspark. Following is my input dataframe (expecting this to be a very large dataset)

df_schema = StructType([StructField('FacilityKey', StringType(), True), \
                        StructField('ItemKey', StringType(), True), \
                        StructField("ItemValye", FloatType(), True)])

df_data =  [('F1', 'I1', 2.4),('F2', 'I1', 3.17),('F3', 'I1', 4.25)]

input_df = spark.createDataFrame(df_data, df_schema)

I need to calculate interpolated values for all stops between Percentile 1 through 99 on the above dataset. Expected result (Sampled 1 through 10)

| PercentileRank | Item | PerformanceScore |
|----------------|------|------------------|
| 1              | I1   | 2.4154           |
| 2              | I1   | 2.4308           |
| 3              | I1   | 2.4462           |
| 4              | I1   | 2.4616           |
| 5              | I1   | 2.477            |
| 6              | I1   | 2.4924           |
| 7              | I1   | 2.5078           |
| 8              | I1   | 2.5232           |
| 9              | I1   | 2.5386           |
| 10             | I1   | 2.554            |
| 11             | I1   | 2.5694           |

I am able to replicate the results using numpy within python the following way

# 1D array  
arr = [2.4, 3.17, 4.25] 
print("arr : ", arr)  
# print("1st percentile of arr : ",  
#        np.percentile(arr, 1)) 
# print("25th percentile of arr : ", 
#        np.percentile(arr, 25)) 
# print("75th percentile of arr : ", 
#        np.percentile(arr, 75)) 

for i in range(1,99):
  print("%s percentile of arr : ",i,np.percentile(arr, i))

Unable to figure out how to calculate the same with Pyspark. Thank you for your help in advance.


Solution

  • Check if this is helpful -

    Load the test data

         val df = Seq(("F1", "I1", 2.4),("F2", "I1", 3.17),("F3", "I1", 4.25))
          .toDF("FacilityKey", "ItemKey", "ItemValue")
        df.show(false)
        df.printSchema()
    
        /**
          * +-----------+-------+---------+
          * |FacilityKey|ItemKey|ItemValue|
          * +-----------+-------+---------+
          * |F1         |I1     |2.4      |
          * |F2         |I1     |3.17     |
          * |F3         |I1     |4.25     |
          * +-----------+-------+---------+
          *
          * root
          * |-- FacilityKey: string (nullable = true)
          * |-- ItemKey: string (nullable = true)
          * |-- ItemValue: double (nullable = false)
          */
    

    Compute percentile for range of quartile

    
        df
          .groupBy("ItemKey")
          .agg(
            expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
              .as("percentile"))
          .withColumn("percentile", explode($"percentile"))
          .show(false)
    
        /**
          * +-------+------------------+
          * |ItemKey|percentile        |
          * +-------+------------------+
          * |I1     |2.4154            |
          * |I1     |2.4307999999999996|
          * |I1     |2.4461999999999997|
          * |I1     |2.4616000000000002|
          * |I1     |2.4770000000000003|
          * |I1     |2.4924            |
          * |I1     |2.5078            |
          * |I1     |2.5232            |
          * |I1     |2.5385999999999997|
          * |I1     |2.554             |
          * |I1     |2.5694            |
          * |I1     |2.5847999999999995|
          * |I1     |2.6002            |
          * |I1     |2.6156            |
          * |I1     |2.631             |
          * |I1     |2.6464            |
          * |I1     |2.6618            |
          * |I1     |2.6772            |
          * |I1     |2.6925999999999997|
          * |I1     |2.708             |
          * +-------+------------------+
          * only showing top 20 rows
          */
    

    Execution plan

        df
          .groupBy("ItemKey")
          .agg(
            expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
              .as("percentile"))
          .withColumn("percentile", explode($"percentile"))
          .explain()
    
        /**
          * == Physical Plan ==
          * Generate explode(percentile#58), [ItemKey#8], false, [percentile#67]
          * +- ObjectHashAggregate(keys=[ItemKey#8], functions=[percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
          * +- Exchange hashpartitioning(ItemKey#8, 2)
          * +- ObjectHashAggregate(keys=[ItemKey#8], functions=[partial_percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
          * +- LocalTableScan [ItemKey#8, ItemValue#9]
          */
    

    You may want to consider approx_percentile for faster exectution

    df
          .groupBy("ItemKey")
          .agg(
            expr(s"approx_percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
              .as("percentile"))
          .withColumn("percentile", explode($"percentile"))
          .show(false)
    
        /**
          * +-------+----------+
          * |ItemKey|percentile|
          * +-------+----------+
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * |I1     |2.4       |
          * +-------+----------+
          * only showing top 20 rows
          */
    

    Please let me know if you see any issues