Search code examples
pythonapache-sparkdataframepysparkapache-spark-ml

Spark: Use same OneHotEncoder on multiple dataframes


I have two DataFrames with the same columns and I want to convert a categorical column into a vector using One-Hot-Encoding. The problem is that for example, in the training set 3 unique values may occur while in the test set you may have less than that.

Training Set:        Test Set:

+------------+       +------------+
|    Type    |       |    Type    |
+------------+       +------------+
|     0      |       |     0      | 
|     1      |       |     1      | 
|     1      |       |     1      | 
|     3      |       |     1      | 
+------------+       +------------+

In this case the OneHotEncoder creates vectors with different length on the training and test set (since each element of the vector represents the presence of a unique value).

Is it possible to use the same OneHotEncoder on multiple DataFrames? There is no fit function and so I don't know how I could do that. Thanks.


Solution

  • Spark >= 3.0:

    Old-style OneHotEncoder has been removed and OneHotEncoderEstimator has been renamed to OneHotEncoder:

    from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel
    
    encoder = (OneHotEncoder()
        .setInputCols(["type"])
        .setOutputCols(["encoded"])
        .setDropLast(False))
    

    Spark >= 2.3:

    Spark 2.3 add new OneHotEncoderEstimator and OneHotEncoderModel classes which work as you expect them to work here.

    from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
    
    encoder = (OneHotEncoderEstimator()
        .setInputCols(["type"])
        .setOutputCols(["encoded"])
        .setDropLast(False))
    model = encoder.fit(training)  # type: OneHotEncoderModel
    
    model.transform(training).show()
    # +----+-------------+
    # |type|      encoded|
    # +----+-------------+
    # | 0.0|(4,[0],[1.0])|
    # | 1.0|(4,[1],[1.0])|
    # | 1.0|(4,[1],[1.0])|
    # | 3.0|(4,[3],[1.0])|
    # +----+-------------+
    
    model.transform(testing).show()
    # +----+-------------+
    # |type|      encoded|
    # +----+-------------+
    # | 0.0|(4,[0],[1.0])|
    # | 1.0|(4,[1],[1.0])|
    # | 1.0|(4,[1],[1.0])|
    # | 1.0|(4,[1],[1.0])|
    # +----+-------------+
    

    Spark < 2.3

    OneHotEncoder is not intended to be used alone. Instead it should be a part of a Pipeline where it can leverage column metadata. Consider following example:

    training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(["type"])
    testing  = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(["type"])
    

    When you use encoder directly it has no knowledge about the context:

    from pyspark.ml.feature import OneHotEncoder
    
    encoder = OneHotEncoder().setOutputCol("encoded").setDropLast(False)
    
    
    encoder.setInputCol("type").transform(training).show()
    ## +----+-------------+
    ## |type|      encoded|
    ## +----+-------------+
    ## | 0.0|(4,[0],[1.0])|
    ## | 1.0|(4,[1],[1.0])|
    ## | 1.0|(4,[1],[1.0])|
    ## | 3.0|(4,[3],[1.0])|
    ## +----+-------------+
    
    
    encoder.setInputCol("type").transform(testing).show()
    ## +----+-------------+
    ## |type|      encoded|
    ## +----+-------------+
    ## | 0.0|(2,[0],[1.0])|
    ## | 1.0|(2,[1],[1.0])|
    ## | 1.0|(2,[1],[1.0])|
    ## | 1.0|(2,[1],[1.0])|
    ## +----+-------------+
    

    Now lets add required metadata. It can be for example by using StringIndexer:

    indexer = (StringIndexer()
      .setInputCol("type")
      .setOutputCol("type_idx")
      .fit(training))
    

    If you apply encoder on the indexed column you'll get consistent encoding on both data sets:

    (encoder.setInputCol("type_idx")
       .transform(indexer.transform(training))
       .show())
    
    ## +----+--------+-------------+
    ## |type|type_idx|      encoded|
    ## +----+--------+-------------+
    ## | 0.0|     1.0|(3,[1],[1.0])|
    ## | 1.0|     0.0|(3,[0],[1.0])|
    ## | 1.0|     0.0|(3,[0],[1.0])|
    ## | 3.0|     2.0|(3,[2],[1.0])|
    ## +----+--------+-------------+
    

    (encoder .setInputCol("type_idx") .transform(indexer.transform(testing)) .show())

    ## +----+--------+-------------+
    ## |type|type_idx|      encoded|
    ## +----+--------+-------------+
    ## | 0.0|     1.0|(3,[1],[1.0])|
    ## | 1.0|     0.0|(3,[0],[1.0])|
    ## | 1.0|     0.0|(3,[0],[1.0])|
    ## | 1.0|     0.0|(3,[0],[1.0])|
    ## +----+--------+-------------+
    

    Please note that the labels you get this way don't reflect values in the input data. If consistent encoding is a hard requirement you should provide schema manually:

    from pyspark.sql.types import StructType, StructField, DoubleType
    
    meta = {"ml_attr": {
        "name": "type",
        "type": "nominal", 
        "vals": ["0.0", "1.0", "3.0"]
    }}
    
    schema = StructType([StructField("type", DoubleType(), False, meta)])
    
    training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(schema)
    testing  = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(schema)
    
    assert (
        encoder.setInputCol("type").transform(training).first()[-1].size == 
        encoder.setInputCol("type").transform(testing).first()[-1].size
    )