Search code examples
scalaapache-sparkapache-spark-ml

Spark - How to use QuantileDiscretizer with RandomForestClassifier


Is it possible to use QuantileDiscretizer, keeping NaN values, with a RandomForestClassifier?

I have been getting an error like this:

18/03/23 17:38:15 ERROR Executor: Exception in task 3.0 in stage 133.0 (TID 381)
java.lang.IllegalArgumentException: DecisionTree given invalid data: Feature 1 is categorical with values in {0,...,1, but a data point gives it value 2.0.
  Bad data point: (1.0,[1.0,2.0])

Example

The idea here is to create a numeric column and discretize it using quantiles, keeping invalid numbers (NaN) in a special bucket.

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
  QuantileDiscretizer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassifier}

val tseq = Seq((0, "a", 1.0), (1, "b", 0.0), (2, "c", 2.0),
               (3, "a", 1.0), (4, "a", 3.0), (5, "c", Double.NaN))
val tdf = SparkInit.ss.createDataFrame(tseq).toDF("id", "category", "class")
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
val discr = new QuantileDiscretizer()
  .setInputCol("class")
  .setOutputCol("quant")
  .setNumBuckets(2)
  .setHandleInvalid("keep")
val assembler = new VectorAssembler()
  .setInputCols(Array("categoryIndex", "quant"))
  .setOutputCol("features")
val rf = new RandomForestClassifier()
  .setLabelCol("categoryIndex")
  .setFeaturesCol("features")
  .setNumTrees(3)
new Pipeline()
  .setStages(Array(indexer, discr, assembler, rf))
  .fit(tdf)
  .transform(tdf)
  .show()

Without trying to fit the Random Forest, I was getting a DataFrame like this:

+---+--------+-----+-------------+-----+---------+
| id|category|class|categoryIndex|quant| features|
+---+--------+-----+-------------+-----+---------+
|  0|       a|  1.0|          0.0|  1.0|[0.0,1.0]|
|  1|       b|  0.0|          2.0|  0.0|[2.0,0.0]|
|  2|       c|  2.0|          1.0|  1.0|[1.0,1.0]|
|  3|       a|  1.0|          0.0|  1.0|[0.0,1.0]|
|  4|       a|  3.0|          0.0|  1.0|[0.0,1.0]|
|  5|       c|  NaN|          1.0|  2.0|[1.0,2.0]|
+---+--------+-----+-------------+-----+---------+

If I try to fit the model, I get the error:

18/03/23 17:54:12 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances)
18/03/23 17:54:12 WARN BlockManager: Putting block rdd_490_3 failed due to an exception
18/03/23 17:54:12 WARN BlockManager: Block rdd_490_3 could not be removed as it was not found on disk or in memory
18/03/23 17:54:12 ERROR Executor: Exception in task 3.0 in stage 143.0 (TID 414)
java.lang.IllegalArgumentException: DecisionTree given invalid data: Feature 1 is categorical with values in {0,...,1, but a data point gives it value 2.0.
  Bad data point: (1.0,[1.0,2.0])
    at org.apache.spark.ml.tree.impl.TreePoint$.findBin(TreePoint.scala:124)
    at org.apache.spark.ml.tree.impl.TreePoint$.org$apache$spark$ml$tree$impl$TreePoint$$labeledPointToTreePoint(TreePoint.scala:93)
    at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:73)
    at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:72)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)

Does QuantileDiscretizer inserts some kind of metadata about the special extra bucket? It's weird that I was able to build a model using columns with the same values before, but without forcing any discretization.

Update

Yes, columns does have attached metadata and it looks like this:

org.apache.spark.sql.types.Metadata = {"ml_attr":
   {"ord":true,
    "vals":["-Infinity, 5.0","5.0, 10.0","10.0, Infinity"],
    "type":"nominal"}
}

The question now might be: how to set correctly the metadata to include values like Double.NaN?


Solution

  • The workaround I used was simply to remove the associated metadata from the discretized columns, letting the decision tree implementation to decide what to do with the data. I think the column would actually become a numerical column ([0, 1, 2, 2, 1], for example), but, if too many categories are created, the column could be discretized again (look for the parameter maxBins).

    In my case, the simplest way to remove the metadata was to fill the DataFrame after applying QuantileDiscretizer:

    // Nothing is actually filled in my case, since there was no missing
    // values before this operation.
    df.na.fill(Double.NaN, Array("quant"))
    

    I'm almost sure you could also manually remove the metadata accessing the column object directly.

    Update

    We can change a column's metadata by creating an alias (reference):

    val metadata: Metadata = ...
    df.select($"colA".as("colB", metadata))
    

    This answer describes a way to get the column's metadata by getting the respective StructField of a DataFrame's schema.