Search code examples
functionapache-sparkmachine-learninglogarithm

How to rescale range of numbers shifting the centre in spark/scala?


Which function in spark can transform / rescale values in range -infinity to +infinity or -2 to 130 etc to max value to be defined.

In below example, I want to ensure that 55 is 100, and 100+ is 0

before | after

45-55 | 90-100

35-44 | 80-89

...

100+ or < 0| 0-5

is any of the ML features functions useful?


Solution

  • I was able to solve it, thanks @user6910411 for your help. You can use dense or sparse vector depending on data and replace MinMaxScaler with MaxAbsScaler and extract values using linalg.Vectors or DenseVector Idea is to split data at the point of required median and reverse scale for one half, then scale both halfs and merge DF.

    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.ml.feature.Normalizer
    import org.apache.spark.ml.feature.MaxAbsScaler
    import org.apache.spark.ml.feature.MinMaxScaler
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.ml.linalg.DenseVector
    import org.apache.spark.sql.functions.udf
    
    val vectorToColumn = udf{ (x: DenseVector, index: Int) => x(index) }
    
    val gt50 = df.filter("score >= 55").select('id,('score * -1).as("score"))
    val lt50 = df.filter("score < 55")
    
    val assembler = new VectorAssembler()
    .setInputCols(Array("score"))
    .setOutputCol("features")
    
    val ass_lt50 = assembler.transform(lt50)
    val ass_gt50 = assembler.transform(gt50)
    
    val scaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("featuresScaled")
    .setMax(100)
    .setMin(0)
    
    val feat_lt50 = scaler.fit(ass_lt50).transform(ass_lt50).drop('score)
    val feat_gt50 = scaler.fit(ass_gt50).transform(ass_gt50).drop('score)
    
    val scaled_lt50 = feat_lt50.select('id,round(
    vectorToColumn(col("featuresScaled"),lit(0))).as("scaled_score"))
    
    val scaled_gt50 = feat_gt50.select('id,round(
    vectorToColumn(col("featuresScaled"),lit(0))).as("scaled_score"))
    
    val scaled = scaled_lt50.unionAll(scaled_gt50)