Search code examples
apache-sparkpipelineapache-spark-mlapache-spark-2.0

Add new fitted stage to a exitsting PipelineModel without fitting again


I would like to concatenate several trained Pipelines to one, which is similar to "Spark add new fitted stage to a exitsting PipelineModel without fitting again" however the solution as below is for PySpark.

> pipe_model_new = PipelineModel(stages = [pipe_model , pipe_model2])
> final_df = pipe_model_new.transform(df1)

In Apache Spark 2.0 "PipelineModel"'s constructor is marked as private, hence it can not be called outside. While in "Pipeline" class, only "fit" method creates "PipelineModel"

val pipelineModel =  new PipelineModel("randomUID", trainedStages)
val df_final_full = pipelineModel.transform(df)
Error:(266, 26) constructor PipelineModel in class PipelineModel cannot be accessed in class Preprocessor
    val pipelineModel =  new PipelineModel("randomUID", trainedStages)

Solution

  • There is nothing* wrong with using Pipeline and invoking fit method. If a stage is a Transfomer, and PipelineModel is**, fit works like identity.

    You can check relevant Python:

    if isinstance(stage, Transformer):
        transformers.append(stage)
        dataset = stage.transform(dataset)
    

    and Scala code:

    This means that fitting process will only validate the schema and create a new PipelineModel object.

    case t: Transformer =>
      t
    

    * The only possible concern is presence of non-lazy Transformers, though, with exception to deprecated OneHotEncoder, Spark core API doesn't provide such.

    ** In Python:

    from pyspark.ml import Transformer, PipelineModel
    
    issubclass(PipelineModel, Transformer)
    
    True 
    

    In Scala

    import scala.reflect.runtime.universe.typeOf
    import org.apache.spark.ml._
    
    typeOf[PipelineModel] <:< typeOf[Transformer]
    
    Boolean = true