Search code examples
pysparkapache-spark-mllibkolmogorov-smirnov

Aggregating Kolmogrov Smirnov Test in pyspark


Is there a way to apply the KS test from the spark.mllib library in pyspark using the groupBy clause or some method of aggregation? For example, I have a dataframe df with columns ID and RESULT like so:

+-------+------+
|     ID|RESULT|
+-------+------+
|3648296|  2.73|
|3648296|  9.64|
|3648189|  0.03|
|3648189|  0.03|
|3648296|  2.51|
|3648189|  0.01|
|3648296|  1.75|
|3648296| 30.23|
|3648189|  0.02|
|3648189|  0.02|
|3648189|  0.02|
|3648296|  3.28|
|3648296| 32.55|
|3648296|  2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189|  0.02|
|3648296|  1.36|
|3648296|  1.64|
|3648296|  1.17|
+-------+------+

There are 2 IDs 3648296 and 3648189 and each of their corresponding RESULT values are in the order of a few hundred thousand. Is it possible to apply a groupBy function like so:

from pyspark.mllib.stat import Statistics

normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)

such that I get an output dataframe like:

+-------+---------+----------+
|     ID|p-value  |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+

is this possible?


Solution

  • This can be solved by binning the data, then performing Kolmogorov-Smirnov Test on the binned data (i.e., histogram). It won't produce the largest distance, but if your effective distribution is smooth, then the result should be close enough.

    By bucketing the results, you ensure that only a limited number of items (the number of buckets) will be loaded into memory at a time.

    First, we need to implement a histogram version of the kstest:

    import numpy as np
    
    def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
        i = hist.cumsum()
        n = i[-1]
    
        bin_right_edges = bin_edges[1:]
        cdf_vals = cdf(bin_right_edges)
        
        statistic = np.max([
            cdf_vals - (i-1) / n,
            i / n - cdf_vals
        ])
        pvalue = stats.distributions.kstwo.sf(statistic, n)
        return statistic, pvalue
    

    Then use it as follows:

    from pyspark.sql import functions as F, types as T
    from pyspark.ml.feature import QuantileDiscretizer
    import pandas as pd
    import numpy as np
    from scipy import stats
    
    # Choose the number of buckets. It depends on your memory
    # availability and affects the accuracy of the test.
    num_buckets = 1_000
    
    # Choose the null hypothesis (H0)
    h0_cdf = stats.norm(0, 1).cdf
    
    # Bucket the result and get the buckets' edges
    bucketizer = QuantileDiscretizer(
        numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
    ).setHandleInvalid("keep").fit(df)
    buckets = np.array(bucketizer.getSplits())
    
    def kstest(key, pdf: pd.DataFrame):
        pdf.sort_values('result_bucket', inplace=True)
        hist = pdf['count'].to_numpy()
        # Some of the buckets might not appear in all the groups, so
        # we filter buckets that are not available.
        bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
        statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
        return pd.DataFrame([[*key, statistic, pvalue]])
    
    df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
        F.count("*").alias("count")
    ).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")