Search code examples
apache-sparkpercentileapproximation

computation of approximative percentiles


When using sparks percentiles_approx function to calculate approximative percentiles in declarative SQL, sometimes grouped I observe that this function is painfully slow. I already have reduced the accuracy to 100 (about 5 minutes for aggregation are required) or sometimes 1000 (20-30 minutes). This is 10x lower than the default of 10k accuracy.

I observe that the resulting percentiles sort of match but when really going into the details and calculating it for many groups i.e. one for each day they do not match well at all. In fact, when pre-aggregating the data as much as possible and keeping all numeric columns (i.e. dropping any non-numeric memory-intensive column) it is possible to use a simple pandas median which is 1) exact and 2) faster than Spark.

Am I choosing the accuracy too low? But 1000 already takes very long to compute (I have >> 1 aggregation) so 5 vs. 25 Minutes multiplies quickly.

How can it be that Pandas is so fast? Due to vectorization?

What are suitable parameters here with regards to a speed/accuracy trade-off?

Would a t-digest https://github.com/tdunning/t-digest


Solution

  • As long as the state per key is small enough, I will apply the following code to calculate percentiles using an UDF. As this would require an updated version of breeze (which might compplicate things or have some side effects, I will copy/paste some parts of breeze).

    val r = scala.util.Random
    val numbers = for (i <- 0 to 20) yield (r.nextDouble)
    
    // in reality spark sort_array(collect_list()) will ensure already pre-sorted condition for the array
    val sortedNumbers = numbers.sorted
    

    //https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/stats/DescriptiveStats.scala#L537

    /**
     * Returns the estimate of a pre-sorted array at p * it.size, where p in [0,1].
     * <p>
     * Note:
     * Result is invalid if the input array is not already sorted.
     * </p>
     */
    def percentileInPlace(arr: Array[Double], p: Double) = {
      if (p > 1 || p < 0) throw new IllegalArgumentException("p must be in [0,1]")
      // +1 so that the .5 == mean for even number of elements.
      val f = (arr.length + 1) * p
      val i = f.toInt
      if (i == 0) arr.head
      else if (i >= arr.length) arr.last
      else {
        arr(i - 1) + (f - i) * (arr(i) - arr(i - 1))
      }
    }
    
    percentileInPlace(sortedNumbers.toArray, 0.4)
    percentileInPlace(sortedNumbers.toArray, 0.5)
    percentileInPlace(sortedNumbers.toArray, 0.6)
    

    this easily can calculate various percentiles in a UDF and return an array of multiple percentiles if required.

    NOTE: use .asNondeterministic() when you plan to return > 1 value from the UDF to save time. Otherwise spark will calculate the collect_list/sort_array and for each percentile separately (potentially) due to catalyst optimization) when multiple columns (=struct fields) are outputted).