Search code examples
machine-learningpysparkregressionapache-spark-mllibapache-spark-ml

Estimate a numerical value through Spark MLlib Regression


I'm training a Spark MLlib linear regressor but I believe I didn't understand part of the libraries hands-on usage.

I have 1 feature (NameItem) and one output (Accumulator). The first one is categorical (Speed, Temp, etc), the second is numerical in double type.

Training set is made of several milions of entries and they are not linearly correlated (I checked with heatmap and correlation indexes).

Issue: I'd like to estimate the Accumulator value given the NameItem value through linear regression, but I think it is not what I'm actually doing.

Question: How can I do It?

I first divided the dataset in training set and data set:

(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)

After that I tried a pipeline approach, as most tutorials show:

1) I indexed NameItem

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")

2) Then I encoded it

encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)

3) And also assembled it

assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")

After that I continued with the effective training:

lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(trainDF)

That's what I obtain when I apply the prediction on the test set:

predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|NameItem      |Accumulator      |CategorizedItem|EncodedItem      |features                       |prediction        |
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|Speed         |44000.00000000   |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0])  |44000.100892495786|
|Speed         |245000.00000000  |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033|
|Temp          |4473860.00000000 |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 |
|Temp          |6065.00000000    |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0])    |6065.097757082314 |
|Temp          |10140.00000000   |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0])   |10140.097731630483|
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
only showing top 5 rows

How can it be possible that for the same categorical feature (for example Temp) I get 3 different predictions?

Even though they are very close to the expected value, I feel there's something wrong.


Solution

  • How can it be possible that for the same categorical feature (for example Temp) I get 3 different predictions?

    It's because somehow your output Accumulator has found its way into features (which of course should not be the case), so the model just "predicts" (essentially copies) this part of the input; that's why the predictions are so "accurate"...

    Seems like the VectorAssembler messes things up. Thing is, you don't really need a VectorAssembler here, since in fact you only have a "single" feature (the one-hot encoded sparse vector in EncodedItem). This might be the reason why VectorAssembler behaves like that here (it is asked to "assemble" a single feature), but in any case this would be a bug.

    So what I suggest is to get rid of the VectorAssembler, and rename the EncodedItem directly as features, i.e.:

    indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
    
    encoderInput = [indexer.getOutputCol()]
    encoderOutput = ["features"]  # 1st change
    encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
    
    lr = LinearRegression(labelCol="Accumulator")
    pipeline = Pipeline(stages=[indexer, encoder, lr])  # 2nd change
    lrModel = pipeline.fit(trainDF)
    

    UPDATE (after feedback in the comments)

    My Spark version Is 1.4.4

    Unfortunately I cannot reproduce the issue, simply because I have not access to Spark 1.4.4, which you are using. But I have confirmed that it works OK in the most recent version of Spark 2.4.4, making me even more inclined to believe that there was indeed some bug back in v1.4, which however has subsequently been resolved.

    Here is a reproduction in Spark 2.4.4, using some dummy data resembling yours:

    spark.version
    # '2.4.4'
    
    from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml import Pipeline
    
    # dummy data resembling yours:
    
    df = spark.createDataFrame([['Speed', 44000], 
                                ['Temp', 23000], 
                                ['Temp', 5000], 
                                ['Speed', 75000], 
                                ['Weight', 5300], 
                                ['Height', 34500], 
                                ['Weight', 6500]], 
                                ['NameItem', 'Accumulator'])
    
    df.show()
    # result:
    +--------+-----------+
    |NameItem|Accumulator|
    +--------+-----------+
    |   Speed|      44000|
    |    Temp|      23000|
    |    Temp|       5000|
    |   Speed|      75000|
    |  Weight|       5300|
    |  Height|      34500|
    |  Weight|       6500|
    +--------+-----------+
    
    indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
    
    encoderInput = [indexer.getOutputCol()]
    encoderOutput = ["EncodedItem"]
    encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
    
    assemblerInput = encoderOutput
    assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
    
    lr = LinearRegression(labelCol="Accumulator")
    
    pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
    lrModel = pipeline.fit(df) 
    lrModel.transform(df).show() # predicting on the same df, for simplicity
    

    The result of the last transform is

    +--------+-----------+---------------+-------------+-------------+------------------+
    |NameItem|Accumulator|CategorizedItem|  EncodedItem|     features|        prediction|
    +--------+-----------+---------------+-------------+-------------+------------------+
    |   Speed|      44000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0|
    |    Temp|      23000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
    |    Temp|       5000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|
    |   Speed|      75000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0|
    |  Weight|       5300|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|
    |  Height|      34500|            3.0|(4,[3],[1.0])|(4,[3],[1.0])|           34500.0|
    |  Weight|       6500|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|   
    +--------+-----------+---------------+-------------+-------------+------------------+
    

    from where you can see that:

    1. The features now do not include the values of the output variable Accumulator, as it should be indeed; in fact, as I had argued above, features is now identical with EncodedItem, making the VectorAssembler redundant, exactly as we should expect since we only have one single feature.
    2. The prediction values are now identical for the same values of NameItem, again as we would expect them to be, plus that they are less accurate and thus more realistic.

    So, most certainly, your issue has to do with the vastly outdated Spark version 1.4.4 you are using. Spark has made leaps since v1.4, and you should seriously consider updating...