Search code examples
scalaapache-sparkmachine-learningtype-parameterf-bounded-polymorphism

Type parameter bounds with Spark objects are hard to get


I'm a beginner at Scala.

I'm trying to create an object that accepts a ProbabilisticClassifier as input and that yields a CrossValidator model as output:

import org.apache.spark.ml.classification.{ProbabilisticClassifier, ProbabilisticClassificationModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

import constants.Const

object MyModels {

  def loadOrCreateModel[A, M, T](
    model: ProbabilisticClassifier[Vector[T], A, M],
    paramGrid: Array[ParamMap]): CrossValidator = {

    // Binary evaluator.
    val binEvaluator = (
      new BinaryClassificationEvaluator()
        .setLabelCol("yCol")
      )

    // Cross validator.
    val cvModel = (
      new CrossValidator()
        .setEstimator(model)
        .setEvaluator(binEvaluator)
        .setEstimatorParamMaps(paramGrid)
        .setNumFolds(3)
      )
    cvModel
  }
}

But this gives me:

sbt package
[info] Loading project definition from somepath/project
[info] Loading settings from build.sbt ...
[info] Set current project to xxx (in build file:somepath/)
[info] Compiling 1 Scala source to somepath/target/scala-2.11/classes ...
[error] somepath/src/main/scala/models.scala:11:12: type arguments [Vector[T],A,M] do not conform to class ProbabilisticClassifier's type parameter bounds [FeaturesType,E <: org.apache.spark.ml.classification.ProbabilisticClassifier[FeaturesType,E,M],M <: org.apache.spark.ml.classification.ProbabilisticClassificationModel[FeaturesType,M]]
[error]     model: ProbabilisticClassifier[Vector[T], A, M],
[error]            ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 3 s, completed Mar 31, 2018 4:22:31 PM
makefile:127: recipe for target 'target/scala-2.11/classes/models/XModels.class' failed
make: *** [target/scala-2.11/classes/models/XModels.class] Error 1

I have tried several combinations of the [A, M, T] parameters as well as different types inside the method's arguments.

The idea is to be able to feed a LogisticRegression or a RandomForestClassifier into this function. From the documentation:

class LogisticRegression extends ProbabilisticClassifier[Vector, LogisticRegression, LogisticRegressionModel] with LogisticRegressionParams with DefaultParamsWritable with Logging
class RandomForestClassifier extends ProbabilisticClassifier[Vector, RandomForestClassifier, RandomForestClassificationModel] with RandomForestClassifierParams with DefaultParamsWritable

Can someone point me where can I learn the resources needed in order to materialize this method?

I'm using Spark 2.1.0.


Edit 01

Thanks @Andrey Tyukin,

I'm sorry that the code was not reproducible. It was in fact a string. Your code does work but maybe I've expressed myself wrong:

<console>:35: error: type mismatch;
found   : org.apache.spark.ml.classification.LogisticRegression
required: org.apache.spark.ml.classification.ProbabilisticClassifier[Vector[?],?,?]
    val cvModel = models.TalkingDataModels.loadOrCreateModel(logistic_regressor, paramGrid)

So maybe my idea was wrong from the very beginning. Is it possible to create a method that accepts both LogisticRegression or a RandomForestClassifier objects?

  • Edited code to be MCVE:

    import org.apache.spark.ml.classification.{ProbabilisticClassifier, ProbabilisticClassificationModel}
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
    import org.apache.spark.ml.classification.LogisticRegression
    
    object MyModels {
    
    def main(array: Array[String]): Unit = {
        val logisticRegressor = (
            new LogisticRegression()
                .setFeaturesCol("yCol")
                .setLabelCol("labels")
                .setMaxIter(10)
            )
        val paramGrid = (
            new ParamGridBuilder()
            .addGrid(logisticRegressor.regParam, Array(0.01, 0.1, 1))
            .build()
        )
        loadOrCreateModel(logisticRegressor, paramGrid)
        println()
    }
    
    def loadOrCreateModel[
        F,
        M <: ProbabilisticClassificationModel[Vector[F], M],
        P <: ProbabilisticClassifier[Vector[F], P, M]
        ](
        probClassif: ProbabilisticClassifier[Vector[F], P, M],
        paramGrid: Array[ParamMap]
        ): CrossValidator = {
    
        // Binary evaluator.
        val binEvaluator =
            new BinaryClassificationEvaluator()
            .setLabelCol("y")
    
        // Cross validator.
        val cvModel =
            new CrossValidator()
            .setEstimator(probClassif)
            .setEvaluator(binEvaluator)
            .setEstimatorParamMaps(paramGrid)
            .setNumFolds(3)
    
        cvModel
        }
    }
    

Solution

  • This here compiles, but I had to throw out your constants.Const.yColumn-string, and replaced it by magic value "y":

    import org.apache.spark.ml.classification.{ProbabilisticClassifier, ProbabilisticClassificationModel}
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
    
    object CrossValidationExample {
    
      def loadOrCreateModel[
        F, 
        M <: ProbabilisticClassificationModel[Vector[F], M],
        P <: ProbabilisticClassifier[Vector[F], P, M]
      ](
        probClassif: ProbabilisticClassifier[Vector[F], P, M],
        paramGrid: Array[ParamMap]
      ): CrossValidator = {
    
        // Binary evaluator.
        val binEvaluator = 
          new BinaryClassificationEvaluator()
          .setLabelCol("y")
    
        // Cross validator.
        val cvModel = 
          new CrossValidator()
          .setEstimator(probClassif)
          .setEvaluator(binEvaluator)
          .setEstimatorParamMaps(paramGrid)
          .setNumFolds(3)
    
        cvModel
      }
    }
    

    Before defining the list of generic parameters, it might be helpful to perform topological sorting in your mind, to understand what parameters depend on which other parameters.

    Here, model depends on the type of features, and the probabilistic classifier depends both on the type of features and on the type of the model. Thus, it would probably make more sense to declare the arguments in the order features, model, classifier. Then you had to get the F-bounded polymorphism right.


    Ah, and by the way, the egyptian-brackets style indentation is IMHO the only sane way to indent multiple argument lists with type arguments that are like fifty miles long (unfortunately, you can't change the length of the type parameters, they tend to be quite lengthy in every machine learning library that I've seen).


    EDIT (answer for the second MCVE-part)

    That's a pretty straight-forward generalization. If it wants linalg.Vector instead of Vector[Feature], then just abstract over that too:

    import org.apache.spark.ml.classification.{ProbabilisticClassifier, ProbabilisticClassificationModel}
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
    import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.linalg.{Vector => LinalgVector}
    
    object CrossValidationExample {
    
      def main(array: Array[String]): Unit = {
          val logisticRegressor = (
              new LogisticRegression()
                  .setFeaturesCol("yCol")
                  .setLabelCol("labels")
                  .setMaxIter(10)
              )
          val paramGrid = (
              new ParamGridBuilder()
              .addGrid(logisticRegressor.regParam, Array(0.01, 0.1, 1))
              .build()
          )
    
          loadOrCreateModel(logisticRegressor, paramGrid)
    
          val rfc: RandomForestClassifier = ???
          loadOrCreateModel(rfc, paramGrid)
      }
    
      def loadOrCreateModel[
        FeatVec,
        M <: ProbabilisticClassificationModel[FeatVec, M],
        P <: ProbabilisticClassifier[FeatVec, P, M]
      ](
        probClassif: ProbabilisticClassifier[FeatVec, P, M],
        paramGrid: Array[ParamMap]
      ): CrossValidator = {
        // Binary evaluator.
        val binEvaluator =
            new BinaryClassificationEvaluator()
            .setLabelCol("y")
    
        // Cross validator.
        val cvModel =
            new CrossValidator()
            .setEstimator(probClassif)
            .setEvaluator(binEvaluator)
            .setEstimatorParamMaps(paramGrid)
            .setNumFolds(3)
    
        cvModel
      }
    }