Is there a way to apply the KS test from the spark.mllib
library in pyspark using the groupBy clause or some method of aggregation?
For example, I have a dataframe df
with columns ID
and RESULT
like so:
+-------+------+
| ID|RESULT|
+-------+------+
|3648296| 2.73|
|3648296| 9.64|
|3648189| 0.03|
|3648189| 0.03|
|3648296| 2.51|
|3648189| 0.01|
|3648296| 1.75|
|3648296| 30.23|
|3648189| 0.02|
|3648189| 0.02|
|3648189| 0.02|
|3648296| 3.28|
|3648296| 32.55|
|3648296| 2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189| 0.02|
|3648296| 1.36|
|3648296| 1.64|
|3648296| 1.17|
+-------+------+
There are 2 ID
s 3648296
and 3648189
and each of their corresponding RESULT
values are in the order of a few hundred thousand.
Is it possible to apply a groupBy function like so:
from pyspark.mllib.stat import Statistics
normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)
such that I get an output dataframe like:
+-------+---------+----------+
| ID|p-value |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+
is this possible?
This can be solved by binning the data, then performing Kolmogorov-Smirnov Test on the binned data (i.e., histogram). It won't produce the largest distance, but if your effective distribution is smooth, then the result should be close enough.
By bucketing the results, you ensure that only a limited number of items (the number of buckets) will be loaded into memory at a time.
First, we need to implement a histogram version of the kstest:
import numpy as np
def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
i = hist.cumsum()
n = i[-1]
bin_right_edges = bin_edges[1:]
cdf_vals = cdf(bin_right_edges)
statistic = np.max([
cdf_vals - (i-1) / n,
i / n - cdf_vals
])
pvalue = stats.distributions.kstwo.sf(statistic, n)
return statistic, pvalue
Then use it as follows:
from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from scipy import stats
# Choose the number of buckets. It depends on your memory
# availability and affects the accuracy of the test.
num_buckets = 1_000
# Choose the null hypothesis (H0)
h0_cdf = stats.norm(0, 1).cdf
# Bucket the result and get the buckets' edges
bucketizer = QuantileDiscretizer(
numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
).setHandleInvalid("keep").fit(df)
buckets = np.array(bucketizer.getSplits())
def kstest(key, pdf: pd.DataFrame):
pdf.sort_values('result_bucket', inplace=True)
hist = pdf['count'].to_numpy()
# Some of the buckets might not appear in all the groups, so
# we filter buckets that are not available.
bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
return pd.DataFrame([[*key, statistic, pvalue]])
df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
F.count("*").alias("count")
).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")