Search code examples
pythonscalaapache-sparkapache-spark-mllibapache-spark-ml

Join two Spark mllib pipelines together


I have two separate DataFrames which each have several differing processing stages which I use mllib transformers in a pipeline to handle.

I now want to join these two pipelines together, keeping the features (columns) from each DataFrame.

Scikit-learn has the FeatureUnion class for handling this, and I can't seem to find an equivalent for mllib.

I can add a custom transformer stage at the end of one pipeline that takes the DataFrame produced by the other pipeline as an attribute and join it in the transform method, but that seems messy.


Solution

  • Pipeline or PipelineModel are valid PipelineStages, and as such can be combined in a single Pipeline. For example with:

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    
    df = spark.createDataFrame([
        (1.0, 0, 1, 1, 0),
        (0.0, 1, 0, 0, 1)
    ], ("label", "x1", "x2", "x3", "x4"))
    
    pipeline1 = Pipeline(stages=[
        VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
    ])
    
    pipeline2 = Pipeline(stages=[
        VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
    ])
    

    you can combine Pipelines:

    Pipeline(stages=[
        pipeline1, pipeline2, 
        VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
    ]).fit(df).transform(df)
    
    +-----+---+---+---+---+---------+---------+-----------------+
    |label|x1 |x2 |x3 |x4 |features1|features2|features         |
    +-----+---+---+---+---+---------+---------+-----------------+
    |1.0  |0  |1  |1  |0  |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
    |0.0  |1  |0  |0  |1  |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
    +-----+---+---+---+---+---------+---------+-----------------+
    

    or pre-fitted PipelineModels:

    model1 = pipeline1.fit(df)
    model2 = pipeline2.fit(df)
    
    Pipeline(stages=[
        model1, model2, 
        VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
    ]).fit(df).transform(df)
    
    +-----+---+---+---+---+---------+---------+-----------------+
    |label| x1| x2| x3| x4|features1|features2|         features|
    +-----+---+---+---+---+---------+---------+-----------------+
    |  1.0|  0|  1|  1|  0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
    |  0.0|  1|  0|  0|  1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
    +-----+---+---+---+---+---------+---------+-----------------+
    

    So the approach I would recommend is to join data beforehand, and fit and transform a whole DataFrame.

    See also: