Search code examples
apache-sparkpysparkapache-spark-mlliblogistic-regressionapache-spark-ml

Issues with Logistic Regression for multiclass classification using PySpark


I am trying to use Logistic Regression to classify the datasets which has Sparse Vector in feature vector:

For full code base and error log, please check my github repo

Case 1: I tried using the pipeline of ML as follow:

# imported library from ML
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

print(type(trainingData)) # for checking only
print(trainingData.take(2)) # for of data type
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=maximumIteration,     regParam=re
gParamValue)
pipeline = Pipeline(stages=[lr])
# Train model
model = pipeline.fit(trainingData)

Got the following error:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=2.0, features=SparseVector(2000, {51: 1.0, 160: 1.0, 341: 1.0, 417: 1.0, 561: 1.0, 656: 1.0, 863: 1.0, 939: 1.0, 1021: 1.0, 1324: 1.0, 1433: 1.0, 1573: 1.0, 1604: 1.0, 1720: 1.0})), Row(label=3.0, features=SparseVector(2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 285: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 1252: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 1423: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0}))]
16/08/25 19:14:07 ERROR org.apache.spark.ml.classification.LogisticRegression: Currently, LogisticRegression with E
lasticNet in ML package only supports binary classification. Found 5 in the input dataset.
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 211, in TrainLRCModel
    model = pipeline.fit(trainingData)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 69, in fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 213, in _fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 69, in fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 133, in _fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 130, in _fit_java
  File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o207.fit.
: org.apache.spark.SparkException: Currently, LogisticRegression with ElasticNet in ML package only supports binary
 classification. Found 5 in the input dataset.
        at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:290)
        at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

Case 2: I search the possible alternate solution of above one and got that LogisticRegressionWithLBFGS will work on multi-class classificaton, I tried as follow:

#imported library
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
print(type(trainingData)) # to check the dataset type
print(trainingData.take(2)) # To see the data
model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
print(type(model))

Got the following error:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=3.0, features=SparseVector(2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 28
5: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.
0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.
0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 1252: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 14
23: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1
.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0})), Row(label=5.0, features=SparseV
ector(2000, {103: 1.0, 310: 1.0, 601: 1.0, 817: 1.0, 866: 1.0, 940: 1.0, 1023: 1.0, 1118: 1.0, 1339: 1.0, 1447: 1.0
, 1634: 1.0, 1776: 1.0}))]
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 230, in TrainLRCModel
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/classification.py", line 382, in train
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 206, in _regression_train_wrapper
TypeError: data should be an RDD of LabeledPoint, but got <class 'pyspark.sql.types.Row'>

Again I tried to convert the dataset into RDD of Labeled Point as follow i.e case 3:

Case 3: Converted the dataset into RDD of Labeled Point so that I can use LogisticRegressionWithLBFGS as follow:

    #imported libraries
    from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
    from pyspark.mllib.regression import LabeledPoint

    print(type(trainingData))
    print(trainingData.take(2))
    trainingData = trainingData.map(lambda row:[LabeledPoint(row.label,row.features)])
    print('type of trainingData')
    print(type(trainingData))
    print(trainingData.take(2))
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
    print(type(model))

Got the following error:

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(label=2.0, features=SparseVector(2000, {51: 1.0, 160: 1.0, 341: 1.0, 417: 1.0, 561: 1.0, 656: 1.0, 863: 1.0, 9
39: 1.0, 1021: 1.0, 1324: 1.0, 1433: 1.0, 1573: 1.0, 1604: 1.0, 1720: 1.0})), Row(label=3.0, features=SparseVector(
2000, {24: 1.0, 51: 2.0, 119: 1.0, 167: 1.0, 182: 1.0, 190: 1.0, 195: 1.0, 285: 1.0, 432: 1.0, 539: 1.0, 571: 1.0, 
630: 1.0, 638: 1.0, 656: 1.0, 660: 2.0, 751: 1.0, 785: 1.0, 794: 1.0, 801: 1.0, 823: 1.0, 893: 1.0, 900: 1.0, 915: 
1.0, 956: 1.0, 966: 1.0, 1025: 1.0, 1029: 1.0, 1035: 1.0, 1038: 1.0, 1093: 1.0, 1115: 2.0, 1147: 1.0, 1206: 1.0, 12
52: 1.0, 1261: 1.0, 1262: 1.0, 1268: 1.0, 1304: 1.0, 1351: 1.0, 1378: 1.0, 1423: 1.0, 1437: 1.0, 1441: 1.0, 1530: 1
.0, 1534: 1.0, 1556: 1.0, 1562: 1.0, 1604: 1.0, 1711: 1.0, 1737: 1.0, 1750: 1.0, 1776: 1.0, 1858: 1.0, 1865: 1.0, 1
923: 1.0, 1926: 1.0, 1959: 1.0, 1999: 1.0}))]
type of trainingData
<class 'pyspark.rdd.PipelinedRDD'>
[[LabeledPoint(2.0, (2000,[51,160,341,417,561,656,863,939,1021,1324,1433,1573,1604,1720],[1.0,1.0,1.0,1.0,1.0,1.0,1
.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))], [LabeledPoint(3.0, (2000,[24,51,119,167,182,190,195,285,432,539,571,630,638,656
,660,751,785,794,801,823,893,900,915,956,966,1025,1029,1035,1038,1093,1115,1147,1206,1252,1261,1262,1268,1304,1351,
1378,1423,1437,1441,1530,1534,1556,1562,1604,1711,1737,1750,1776,1858,1865,1923,1926,1959,1999],[1.0,2.0,1.0,1.0,1.
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1
.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]]
Traceback (most recent call last):
  File "/home/LR/test.py", line 260, in <module>
    accuracy = TrainLRCModel(trainData, testData)
  File "/home/LR/test.py", line 230, in TrainLRCModel
    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/classification.py", line 381, in train
AttributeError: 'list' object has no attribute 'features'

Can someone please suggest where I am missing something, I wanted to use the Logistic Regression in PySpark and classify the multi-class classification.

Currently I am using spark version version 1.6.2 and python version Python 2.7.9 on google cloud.

Thanking you in advance for you kind help.


Solution

  • Case 1: There is nothing strange here, simply (as the error message says) LogisticRegression does not support multi-class classification, as clearly stated in the documentation.

    Case 2: Here you have switched from ML to MLlib, which however does not work with dataframes but needs the input as RDD of LabeledPoint (documentation), hence again the error message is expected.

    Case 3: Here is where things get interesting. First, you should remove the brackets from your map function, i.e. it should be

    trainingData = trainingData.map(lambda row: LabeledPoint(row.label, row.features)) # no brackets after "row:"
    

    Nevertheless, guessing from the code snippets you have provided, most probably you are going to get a different error now:

    model = LogisticRegressionWithLBFGS.train(trainingData, numClasses=5)
    [...]
    : org.apache.spark.SparkException: Input validation failed.
    

    Here is what happening (it took me some time to figure it out), using some dummy data (it's always a good idea to provide some sample data with your question):

    # 3-class classification
    data = sc.parallelize([
         LabeledPoint(3.0, SparseVector(100,[10, 98],[1.0, 1.0])),
         LabeledPoint(1.0, SparseVector(100,[1, 22],[1.0, 1.0])),
         LabeledPoint(2.0, SparseVector(100,[36, 54],[1.0, 1.0]))
    ])
    
    lrm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) # throws exception
    [...]
    : org.apache.spark.SparkException: Input validation failed.
    

    The problem is that your labels must start from 0 (and this is nowhere documented - you have to dig in the Scala source code to see that this is the case!); so, mapping the labels in my dummy data above from (1.0, 2.0, 3.0) to (0.0, 1.0, 2.0), we finally get:

    # 3-class classification
    data = sc.parallelize([
         LabeledPoint(2.0, SparseVector(100,[10, 98],[1.0, 1.0])),
         LabeledPoint(0.0, SparseVector(100,[1, 22],[1.0, 1.0])),
         LabeledPoint(1.0, SparseVector(100,[36, 54],[1.0, 1.0]))
    ])
    
    lrm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) # no error now
    

    Judging from your numClasses=5 argument, as well as from the label=5.0 in one of your printed records, I guess that most probably your code suffers from the same issue. Change your labels to [0.0, 4.0] and you should be fine.

    (I suggest that you delete the other identical question you have opened here, for reducing clutter...)