I am trying to use standardScaler for sparkML library for a dataframe with columns with null values. I would like to retain the null values, but when I use standard scaler with mean, the mean of the columns with null values also become null. Is there any way to make the standard scaler skip the null values for mean calculation ( like handleInvalid option in vector assembler) ?
Below is the code example
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate()
test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),(1,4,8),(1,5,7),(1,6,8),
(1,7,1),(1,8,6),(1,9,9),(1,10,3),(1,11,12)],schema=['col1','col2','col3'])
#%%
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel
assmbler = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
After this if I try to get the mean values.
pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5, nan])
As you can see the mean of the second column is nan. Any way to avoid this?
The fit method of Spark's StandardScaler uses Summarizer.metrics("mean", "std")
to calculate the mean of a column:
val Row(mean: Vector, std: Vector) = dataset
.select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
.select("summary.mean", "summary.std")
.first()
The Summarizer class itself has no option to ignore null
or NaN
/None
values, so there is no built-in solution for the problem.
test_df = test_df.filter("not col2 is null and not col3 is null")
test_df = test_df.fillna(0) #or any other value that is appropriate for the task
Add an Imputer to the pipeline to replace missing values with either the mean, the median or the most frequent value of the feature:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col2', 'col3'], outputCols=['col2i', 'col3i'])
assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[imputer,assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
pipe_fit.stages[2].mean
now returns
DenseVector([6.5, 6.3])
as the missing value in col3
has been replaced with the mean value of this column.
With the strategy parameter either the median or the most common value can be used instead of the mean, but using the mean is the default value.
Using the standard Spark SQL functions mean and stddev it is possible to implement a similar logic like the StandardScaler. Both SQL functions handle None
values nicely.
cols = ['col2', 'col3'] # the columns that should be scaled
mean_and_std_cols=[c for col in cols for c in
(F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
mean_and_std = test_df.select(mean_and_std_cols).first()
scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
/mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
test_df = test_df.select(test_df.columns + scaled_cols)
This logic adds the two columns col2_s
and col3_s
to the dataframe that contain the scaled values. mean_and_std
contains the actual values for mean and std:
Row(col2_mean=6.5, col2_std=3.0276503540974917, col3_mean=6.333333333333333, col3_std=3.4641016151377544)
The newly created columns col2_s
and col3_s
can now be used as input columns for the VectorAssembler:
assmbler = VectorAssembler(inputCols=['col2_s','col3_s'],outputCol='col_vec',handleInvalid='keep')
pipeline = Pipeline(stages=[assmbler])
pipe_fit= ...
This option might be a bit slower on large datasets than the original scaler as the values for mean and std are not approximated but calculated exactly.