Search code examples
apache-sparkpysparkcross-validationmetrics

Custom Evaluator in PySpark


I want to optimize the hyper parameters of a PySpark Pipeline using a ranking metric (MAP@k). I have seen in the documentation how to use the metrics defined in the Evaluation (Scala), but I need to define a custom evaluator class because MAP@k is not implemented yet. So I need to do something like:

model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder() \
    .addGrid(lg.regParam, [0.001, 0.1]) \
    .addGrid(lg.elasticNetParam, [0, 1]) \
    .build()

crossval_lg = CrossValidator(estimator=model,
                      estimatorParamMaps=paramGrid_lg,
                      evaluator=MAPkEvaluator(), 
                      numFolds=2)

where MAPkEvaluator() is my custom evaluator. I've seen a similar question but not the answer.

Is there any example or documentation available for this? Does anyone know if it Is possible to implement it in PySpark? What methods should I implement?


Solution

  • @jarandaf answered the question in the first comment, but for clarity reasons I write how to implement a basic example with a random metric:

    import random
    from pyspark.ml.evaluation import Evaluator
    
    class RandomEvaluator(Evaluator):
    
        def __init__(self, predictionCol="prediction", labelCol="label"):
            self.predictionCol = predictionCol
            self.labelCol = labelCol
    
        def _evaluate(self, dataset):
            """
            Returns a random number. 
            Implement here the true metric
            """
            return random.randint(0,1)
    
        def isLargerBetter(self):
            return True
    

    Now the following code should work:

    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    
    paramGrid_lg = ParamGridBuilder() \
        .addGrid(lg.regParam, [0.01, 0.1]) \
        .addGrid(lg.elasticNetParam, [0, 1]) \
        .build()
    
    crossval_lg = CrossValidator(estimator=model,
                          estimatorParamMaps=paramGrid_lg,
                          evaluator= RandomEvaluator(), 
                          numFolds=2)
    
    cvModel = crossval_lg.fit(train_val_data_)