Search code examples
apache-sparkmachine-learningpysparkcross-validationapache-spark-ml

cross validation in pyspark


I used cross validation to train a linear regression model using the following code:

from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(maxIter=maxIteration)
modelEvaluator=RegressionEvaluator()
pipeline = Pipeline(stages=[lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0, 1]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=3)

cvModel = crossval.fit(training)

now I want to draw the roc curve, I used the following code but I get this error:

'LinearRegressionTrainingSummary' object has no attribute 'areaUnderROC'

trainingSummary = cvModel.bestModel.stages[-1].summary
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

I also want to check the objectiveHistory at each itaration, I know that I can get it at the end

print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

but I want to get it at each iteration, how can I do this?

Moreover I want to evaluate the model on the test data, how can I do that?

prediction = cvModel.transform(test)

I know for the training data set I can write:

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

but how can I get these metrics for testing data set?


Solution

  • 1) The area under the ROC curve (AUC) is defined only for binary classification, hence you cannot use it for regression tasks, as you are trying to do here.

    2) The objectiveHistory for each iteration is only available when the solver argument in the regression is l-bfgs (documentation); here is a toy example:

    spark.version
    # u'2.1.1'
    
    from pyspark.ml import Pipeline
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    
    dataset = spark.createDataFrame(
            [(Vectors.dense([0.0]), 0.2),
             (Vectors.dense([0.4]), 1.4),
             (Vectors.dense([0.5]), 1.9),
             (Vectors.dense([0.6]), 0.9),
             (Vectors.dense([1.2]), 1.0)] * 10,
             ["features", "label"])
    
    lr = LinearRegression(maxIter=5, solver="l-bfgs") # solver="l-bfgs" here
    
    modelEvaluator=RegressionEvaluator()
    pipeline = Pipeline(stages=[lr])
    paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0, 1]).build()
    
    crossval = CrossValidator(estimator=lr,
                              estimatorParamMaps=paramGrid,
                              evaluator=modelEvaluator,
                              numFolds=3)
    
    cvModel = crossval.fit(dataset)
    
    trainingSummary = cvModel.bestModel.summary
    
    trainingSummary.totalIterations
    # 2
    trainingSummary.objectiveHistory # one value for each iteration
    # [0.49, 0.4511834723904831]
    

    3) You have already defined a RegressionEvaluator which you can use for evaluating your test set but, if used without arguments, it assumes the RMSE metric; here is a way to define evaluators with different metrics and apply them to your test set (continuing the code from above):

    test = spark.createDataFrame(
            [(Vectors.dense([0.0]), 0.2),
             (Vectors.dense([0.4]), 1.1),
             (Vectors.dense([0.5]), 0.9),
             (Vectors.dense([0.6]), 1.0)],
            ["features", "label"])
    
    modelEvaluator.evaluate(cvModel.transform(test))  # rmse by default, if not specified
    # 0.35384585061028506
    
    eval_rmse = RegressionEvaluator(metricName="rmse")
    eval_r2 = RegressionEvaluator(metricName="r2")
    
    eval_rmse.evaluate(cvModel.transform(test)) # same as above
    # 0.35384585061028506
    
    eval_r2.evaluate(cvModel.transform(test))
    # -0.001655087952929124