Search code examples
apache-sparkpipelinecategorical-dataapache-spark-ml

spark ml pipeline handle unseen labels


To handle new and unseen labels in a spark ml pipeline I want to use most frequent imputation. if the pipeline consists of 3 steps

  1. preprocessing
  2. learn most frequent item
  3. stringIndexer for each categorical column
  4. vector assembler
  5. estimator e.g. random forest

Assuming (1) and (2,3) and (4,5) constitute separate pipelines

  • I can fit and transform 1 for train and test data. This means all nan values were handled, i.e. imputed
  • 2,3 will fit nicely as well as 4,5
  • then I can use

the following

val fittedLabels = pipeline23.stages collect { case a: StringIndexerModel => a }
val result = categoricalColumns.zipWithIndex.foldLeft(validationData) {
    (currentDF, colName) =>
      currentDF
        .withColumn(colName._1, when(currentDF(colName._1) isin (fittedLabels(colName._2).labels: _*), currentDF(colName._1))
          .otherwise(lit(null)))
  }.drop("replace")

to replace new/unseen labels with null

  • these deliberately introduced nulls are imputed by the most frequent imputer

However, this setup is very ugly. as tools like CrossValidator no longer work (as I can't supply a single pipeline)

How can I access the fitted labels within the pipeline to build an in Transformer which handles setting new values to null?

Do you see a better approach to accomplish handling new values? I assume most frequent imputation is ok i.e. for a dataset with around 90 columns only very few columns will contain an unseen label.


Solution

  • I finally realized that this functionality is required to reside in the pipeline to work properly, i.e. requires an additional new PipelineStage component.