Search code examples
pysparkdatabrickscross-validationdatabricks-community-edition

why do I have a label Problem when using Crossvalidator


I'm new to spark :) I try to use CrossValidator. My model is as follows :

training

#training data - several repartition have been tested, 50/50 seems the best
(trainData, testData) = modelData.randomSplit([0.5, 0.5])

#counting data used
print("Training dataset count : " +str(trainData.count()))
print("Test dataset count : " +str(testData.count()))
trainData.cache()
testData.cache()

Model

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'v4_Indexer', maxIter = 5)
lrModel = lr.fit(trainData)
predictions = lrModel.transform(testData)
predictions.select('v4_Indexer','features','rawPrediction', 'prediction', 'probability').toPandas().head(2500)

I try this code for crossvalidation :

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[lr])
paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0,0.5,1]).addGrid(lr.elasticNetParam, [0,0.5,1]).addGrid(lr.maxIter,[1,10]).build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainData)
trainingSummary = cvModel.bestModel

I have a warning /databricks/spark/python/pyspark/ml/util.py:92: UserWarning: CrossValidator_7ba8c8c903af fit call failed but some spark jobs may still running for unfinished trials. To address this issue, you should enable pyspark pinned thread mode. warnings.warn("{} fit call failed but some spark jobs "

And an error : IllegalArgumentException: label does not exist. Available: v4_Indexer, features, CrossValidator_7ba8c8c903af_rand

this model worked for a while. I do not understand why it doesn't now.

Thx in advance for any help you could bring me =)


Solution

  • I've solved the issue by changing entirely my code. This is what it looks like now (prerequisite %pip install mlflow):

    from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml import Pipeline
    
    # StringIndexer: Convert the input column "label" (digits) to categorical values
    indexer = StringIndexer(inputCol="v4_Indexer", outputCol="indexedLabel")
    
    # Create an evaluator.  In this case, use "weightedPrecision".
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    evaluator = MulticlassClassificationEvaluator(labelCol="v4_Indexer", metricName="weightedPrecision")
    
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    
    # DecisionTreeClassifier: Learn to predict column "indexedLabel" using the "features" column
    
    dtc = DecisionTreeClassifier(labelCol="indexedLabel")
    
    # Chain indexer + dtc together into a single ML Pipeline
    pipeline = Pipeline(stages=[indexer, dtc])
    # Define the parameter grid to examine.
    grid = ParamGridBuilder().addGrid(dtc.maxDepth, [2, 3, 4, 5, 6, 7, 8]).addGrid(dtc.maxBins, [2, 4, 8]).build()
    
    # Create a cross validator, using the pipeline, evaluator, and parameter grid you created in previous steps.
    cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)
    
    # Explicitly create a new run.
    # This allows this cell to be run multiple times.
    # If you omit mlflow.start_run(), then this cell could run once, but a second run would hit conflicts when attempting to overwrite the first run.
    import mlflow
    import mlflow.spark
    
    with mlflow.start_run():
        # Run the cross validation on the training dataset. The cv.fit() call returns the best model it found.
        cvModel = cv.fit(train)
        
        # Evaluate the best model's performance on the test dataset and log the result.
        test_metric = evaluator.evaluate(cvModel.transform(test))
        mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric) 
      
        # Log the best model.
        mlflow.spark.log_model(spark_model=cvModel.bestModel, artifact_path='best-model')