In my dataFrame, some columns are continuous values, and other columns just has 0/1 values. I want to use StandardScaler on continuous columns before logistic regression with Pipeline. How to implement the code?
I try:
from pyspark.ml.feature import VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,Transformer
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType, ArrayType
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
class StandardScalerSubset(Transformer, DefaultParamsReadable, DefaultParamsWritable):
"""
A custom Transformer which use StandardScaler on subset of features.
"""
def __init__(self, to_scale_cols, remaining_cols):
super(StandardScalerSubset, self).__init__()
self.to_scale_cols = to_scale_cols # continuous columns to be scaled
self.remaining_cols = remaining_cols # other columns
def _transform(self, data):
va = VectorAssembler().setInputCols(self.to_scale_cols).setOutputCol("to_scale_vector")
data_va = va.transform(data)
scaler = StandardScaler(inputCol="to_scale_vector", outputCol="scaled_vector", withMean=True, withStd=True)
scaler_model = scaler.fit(data_va)
data_scaled = scaler_model.transform(data_va)
vector2list = udf(lambda x: x.toArray().tolist(),ArrayType(FloatType()))
# return all columns
data_res = data_scaled.withColumn("scaled_list", vector2list("scaled_vector")) \
.select(self.remaining_cols
+ [col("scaled_list").getItem(i).alias(c) for (i, c) in enumerate(self.scale_cols)])
return data_res
For input:
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 5| 10| 0|
# | 0| 10| 20| 1|
# | 1| 15| 25| 0|
# | 0| 30| 40| 1|
# +---+---+---+---+
The output would be:
# +---+---+--------+-----+
# | a| d| b| c|
# +---+---+--------+-----+
# | 1| 0| -0.9258| -1.1|
# | 0| 1| -0.4629| -0.3|
# | 1| 0| 0.0| 0.1|
# | 0| 1| 1.3887| 1.3|
# +---+---+--------+-----+
It just can be used like this:
scalerFeatures = ['xxx']
featureAr = ['xxx']
remainingFeatures = ['xxx']
sss = StandardScalerSubset(scale_cols=scalerFeatures, remaining_cols=remainingFeatures)
vectorAssembler = VectorAssembler().setInputCols(featureArr).setOutputCol("features")
lrModel = LogisticRegression(featuresCol="features",regParam=0.1,maxIter=100,family="binomial")
pipeline = Pipeline().setStages([sss, vectorAssembler, modelObject])
pipeline.fit(trainData).write().overwrite().save(modelSavePath)
When I use PipelineModel.load(modelSavePath) to load model, I get error. I think that I should implement fit and transform at the same time. However I don't know how to do that. Can Anyone help me? Thanks.
Too long for a comment, but here's what you can try:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
scalerFeatures = ['b', 'c']
remainingFeatures = ['a', 'd']
featureArr = remainingFeatures + [('scaled_' + f) for f in scalerFeatures]
va1 = [VectorAssembler(inputCols=[f], outputCol=('vec_' + f)) for f in scalerFeatures]
ss = [StandardScaler(inputCol='vec_' + f, outputCol='scaled_' + f, withMean=True, withStd=True) for f in scalerFeatures]
va2 = VectorAssembler(inputCols=featureArr, outputCol="features")
lr = LogisticRegression()
stages = va1 + ss + [va2]
# I don't have a label column, but if you do, you can put lr stage at the end:
# stages = va1 + ss + [va2, lr]
p = Pipeline(stages=stages)
p.fit(df).transform(df).show()
+---+---+---+---+------+------+---------------------+----------------------+--------------------------------------------------+
|a |b |c |d |vec_b |vec_c |scaled_b |scaled_c |features |
+---+---+---+---+------+------+---------------------+----------------------+--------------------------------------------------+
|1 |5 |10 |0 |[5.0] |[10.0]|[-0.9258200997725514]|[-1.0999999999999999] |[1.0,0.0,-0.9258200997725514,-1.0999999999999999] |
|0 |10 |20 |1 |[10.0]|[20.0]|[-0.4629100498862757]|[-0.29999999999999993]|[0.0,1.0,-0.4629100498862757,-0.29999999999999993]|
|1 |15 |25 |0 |[15.0]|[25.0]|[0.0] |[0.09999999999999998] |[1.0,0.0,0.0,0.09999999999999998] |
|0 |30 |40 |1 |[30.0]|[40.0]|[1.3887301496588271] |[1.2999999999999998] |[0.0,1.0,1.3887301496588271,1.2999999999999998] |
+---+---+---+---+------+------+---------------------+----------------------+--------------------------------------------------+