Search code examples
scalaapache-sparkpysparkpojoh2o

Create a map to call the POJO for each row of Spark Dataframe


I built an H2O model in R and saved the POJO code. I want to score parquet files in hdfs using the POJO but I'm not sure how to go about it. I plan on reading the parquet files into spark (scala/SparkR/PySpark) and scoring them on there. Below is the excerpt I found on H2O's documentation page.

"How do I run a POJO on a Spark Cluster?

The POJO provides just the math logic to do predictions, so you won’t find any Spark (or even H2O) specific code there. If you want to use the POJO to make predictions on a dataset in Spark, create a map to call the POJO for each row and save the result to a new column, row-by-row"

Does anyone have some example code of how I can do this? I'd greatly appreciate any assistance. I code primarily in R and SparkR, and I'm not sure how I can "map" the POJO to each line.

Thanks in advance.


Solution

  • I just posted a solution that actually uses DataFrame/Dataset. The post used a Star Wars dataset to build a model in R and then scored MOJO on the test set in Spark. I'll paste the only relevant part here:

    Scoring with Spark (and Scala)

    You could either use spark-submit or spark-shell. If you use spark-submit, h2o-genmodel.jar needs to be put under lib folder of the root directory of your spark application so it could be added as a dependency during compilation. The following code assumes you're running spark-shell. In order to use h2o-genmodel.jar, you need to append the jar file when launching spark-shell by providing a --jar flag. For example:

    /usr/lib/spark/bin/spark-shell \
    --conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
    --conf spark.driver.memory="3g" \
    --conf spark.executor.memory="10g" \
    --conf spark.executor.instances=10 \
    --conf spark.executor.cores=4 \
    --jars /path/to/h2o-genmodel.jar
    

    Now in the Spark shell, import the dependencies

    import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
    import _root_.hex.genmodel.MojoModel
    

    Using DataFrame

    val modelPath = "/path/to/zip/file"
    val dataPath = "/path/to/test/data"
    
    // Import data
    val dfStarWars = spark.read.option("header", "true").csv(dataPath)
    // Import MOJO model
    val mojo = MojoModel.load(modelPath)
    val easyModel = new EasyPredictModelWrapper(mojo)
    
    // score
    val dfScore = dfStarWars.map {
      x =>
        val r = new RowData
        r.put("height", x.getAs[String](1))
        r.put("mass", x.getAs[String](2))
        val score = easyModel.predictBinomial(r).classProbabilities
        (x.getAs[String](0), score(1))
    }.toDF("name", "isHumanScore")
    

    The variable score is a list of two scores for level 0 and 1. score(1) is the score for level 1, which is "human". By default the map function returns a DataFrame with unspecified column names "_1", "_2", etc. You can rename the columns by calling toDF.

    Using Dataset

    To use the Dataset API we just need to create two case classes, one for the input data, and one for the output.

    case class StarWars (
      name: String,
      height: String,
      mass: String,
      is_human: String
    )
    
    case class Score (
      name: String,
      isHumanScore: Double
    )
    
    
    // Dataset
    val dtStarWars = dfStarWars.as[StarWars]
    val dtScore = dtStarWars.map {
      x =>
        val r = new RowData
        r.put("height", x.height)
        r.put("mass", x.mass)
        val score = easyModel.predictBinomial(r).classProbabilities
        Score(x.name, score(1))
    }
    

    With Dataset you can get the value of a column by calling x.columnName directly. Just notice that the types of the column values have to be String, so you might need to manually cast them if they are of other types defined in the case class.