Search code examples
pysparkapache-spark-sqlapache-spark-ml

Pyspark standard scaler - excluding null values for mean calculation


I am trying to use standardScaler for sparkML library for a dataframe with columns with null values. I would like to retain the null values, but when I use standard scaler with mean, the mean of the columns with null values also become null. Is there any way to make the standard scaler skip the null values for mean calculation ( like handleInvalid option in vector assembler) ?

Below is the code example

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate() 

test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),(1,4,8),(1,5,7),(1,6,8),
                                  (1,7,1),(1,8,6),(1,9,9),(1,10,3),(1,11,12)],schema=['col1','col2','col3'])
#%%

from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel

assmbler  = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)

After this if I try to get the mean values.

pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5, nan])

As you can see the mean of the second column is nan. Any way to avoid this?


Solution

  • The fit method of Spark's StandardScaler uses Summarizer.metrics("mean", "std") to calculate the mean of a column:

    val Row(mean: Vector, std: Vector) = dataset
      .select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
      .select("summary.mean", "summary.std")
      .first()
    

    The Summarizer class itself has no option to ignore null or NaN/None values, so there is no built-in solution for the problem.

    There are several options to handle the issue:

    Filter out the the None values before fitting the pipeline

    test_df = test_df.filter("not col2 is null and not col3 is null")
    

    Replace missing values with a constant value

    test_df = test_df.fillna(0) #or any other value that is appropriate for the task
    

    Using an Imputer

    Add an Imputer to the pipeline to replace missing values with either the mean, the median or the most frequent value of the feature:

    from pyspark.ml.feature import Imputer
    imputer = Imputer(inputCols=['col2', 'col3'], outputCols=['col2i', 'col3i'])
    assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
    sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
    pipeline = Pipeline(stages=[imputer,assmbler,sclr])
    pipe_fit= pipeline.fit(test_df)
    df_res = pipe_fit.transform(test_df)
    

    pipe_fit.stages[2].mean now returns

    DenseVector([6.5, 6.3])
    

    as the missing value in col3 has been replaced with the mean value of this column.

    With the strategy parameter either the median or the most common value can be used instead of the mean, but using the mean is the default value.

    Scale the required columns without a StandardScaler

    Using the standard Spark SQL functions mean and stddev it is possible to implement a similar logic like the StandardScaler. Both SQL functions handle None values nicely.

    cols = ['col2', 'col3'] # the columns that should be scaled
    mean_and_std_cols=[c for col in cols for c in 
        (F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
    mean_and_std = test_df.select(mean_and_std_cols).first()
    scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
        /mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
    test_df = test_df.select(test_df.columns + scaled_cols)
    

    This logic adds the two columns col2_s and col3_s to the dataframe that contain the scaled values. mean_and_std contains the actual values for mean and std:

    Row(col2_mean=6.5, col2_std=3.0276503540974917, col3_mean=6.333333333333333, col3_std=3.4641016151377544)
    

    The newly created columns col2_s and col3_s can now be used as input columns for the VectorAssembler:

    assmbler  = VectorAssembler(inputCols=['col2_s','col3_s'],outputCol='col_vec',handleInvalid='keep')
    pipeline = Pipeline(stages=[assmbler])
    pipe_fit= ...
    

    This option might be a bit slower on large datasets than the original scaler as the values for mean and std are not approximated but calculated exactly.