Search code examples
pysparkapache-spark-sqlapache-spark-mllibapache-spark-mlone-hot-encoding

pyspark - Convert sparse vector obtained after one hot encoding into columns


I am using apache Spark ML lib to handle categorical features using one hot encoding. After writing the below code I am getting a vector c_idx_vec as output of one hot encoding. I do understand how to interpret this output vector but I am unable to figure out how to convert this vector into columns so that I get a new transformed dataframe.Take this dataset for example:

>>> fd = spark.createDataFrame( [(1.0, "a"), (1.5, "a"), (10.0, "b"), (3.2, "c")], ["x","c"])
>>> ss = StringIndexer(inputCol="c",outputCol="c_idx")
>>> ff = ss.fit(fd).transform(fd)
>>> ff.show()

    +----+---+-----+
    |   x|  c|c_idx|
    +----+---+-----+
    | 1.0|  a|  0.0|
    | 1.5|  a|  0.0|
    |10.0|  b|  1.0|
    | 3.2|  c|  2.0|
    +----+---+-----+

By default, the OneHotEncoder will drop the last category:

>>> oe = OneHotEncoder(inputCol="c_idx",outputCol="c_idx_vec")
>>> fe = oe.transform(ff)
>>> fe.show()
    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(2,[0],[1.0])|
    | 1.5|  a|  0.0|(2,[0],[1.0])|
    |10.0|  b|  1.0|(2,[1],[1.0])|
    | 3.2|  c|  2.0|    (2,[],[])|
    +----+---+-----+-------------+

Of course, this behavior can be changed:

>>> oe.setDropLast(False)
>>> fl = oe.transform(ff)
>>> fl.show()

    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(3,[0],[1.0])|
    | 1.5|  a|  0.0|(3,[0],[1.0])|
    |10.0|  b|  1.0|(3,[1],[1.0])|
    | 3.2|  c|  2.0|(3,[2],[1.0])|
    +----+---+-----+-------------+

So, I wanted to know how to convert my c_idx_vec vector into new dataframe as below:

enter image description here


Solution

  • Here is what you can do:

    >>> from pyspark.ml.feature import OneHotEncoder, StringIndexer
    >>>
    >>> fd = spark.createDataFrame( [(1.0, "a"), (1.5, "a"), (10.0, "b"), (3.2, "c")], ["x","c"])
    >>> ss = StringIndexer(inputCol="c",outputCol="c_idx")
    >>> ff = ss.fit(fd).transform(fd)
    >>> ff.show()
    +----+---+-----+
    |   x|  c|c_idx|
    +----+---+-----+
    | 1.0|  a|  0.0|
    | 1.5|  a|  0.0|
    |10.0|  b|  1.0|
    | 3.2|  c|  2.0|
    +----+---+-----+
    
    >>>
    >>> oe = OneHotEncoder(inputCol="c_idx",outputCol="c_idx_vec")
    >>> oe.setDropLast(False)
    OneHotEncoder_49e58b281387d8dc0c6b
    >>> fl = oe.transform(ff)
    >>> fl.show()
    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(3,[0],[1.0])|
    | 1.5|  a|  0.0|(3,[0],[1.0])|
    |10.0|  b|  1.0|(3,[1],[1.0])|
    | 3.2|  c|  2.0|(3,[2],[1.0])|
    +----+---+-----+-------------+
    
    // Get c and its repective index. One hot encoder will put those on same index in vector
    
    >>> colIdx = fl.select("c","c_idx").distinct().rdd.collectAsMap()
    >>> colIdx
    {'c': 2.0, 'b': 1.0, 'a': 0.0}
    >>>
    >>> colIdx =  sorted((value, "ls_" + key) for (key, value) in colIdx.items())
    >>> colIdx
    [(0.0, 'ls_a'), (1.0, 'ls_b'), (2.0, 'ls_c')]
    >>>
    >>> newCols = list(map(lambda x: x[1], colIdx))
    >>> actualCol = fl.columns
    >>> actualCol
    ['x', 'c', 'c_idx', 'c_idx_vec']
    >>> allColNames = actualCol + newCols
    >>> allColNames
    ['x', 'c', 'c_idx', 'c_idx_vec', 'ls_a', 'ls_b', 'ls_c']
    >>>
    >>> def extract(row):
    ...     return tuple(map(lambda x: row[x], row.__fields__)) + tuple(row.c_idx_vec.toArray().tolist())
    ...
    >>> result = fl.rdd.map(extract).toDF(allColNames)
    >>> result.show(20, False)
    +----+---+-----+-------------+----+----+----+
    |x   |c  |c_idx|c_idx_vec    |ls_a|ls_b|ls_c|
    +----+---+-----+-------------+----+----+----+
    |1.0 |a  |0.0  |(3,[0],[1.0])|1.0 |0.0 |0.0 |
    |1.5 |a  |0.0  |(3,[0],[1.0])|1.0 |0.0 |0.0 |
    |10.0|b  |1.0  |(3,[1],[1.0])|0.0 |1.0 |0.0 |
    |3.2 |c  |2.0  |(3,[2],[1.0])|0.0 |0.0 |1.0 |
    +----+---+-----+-------------+----+----+----+
    
    // Typecast new columns to int
    
    >>> for col in newCols:
    ...     result = result.withColumn(col, result[col].cast("int"))
    ...
    >>> result.show(20, False)
    +----+---+-----+-------------+----+----+----+
    |x   |c  |c_idx|c_idx_vec    |ls_a|ls_b|ls_c|
    +----+---+-----+-------------+----+----+----+
    |1.0 |a  |0.0  |(3,[0],[1.0])|1   |0   |0   |
    |1.5 |a  |0.0  |(3,[0],[1.0])|1   |0   |0   |
    |10.0|b  |1.0  |(3,[1],[1.0])|0   |1   |0   |
    |3.2 |c  |2.0  |(3,[2],[1.0])|0   |0   |1   |
    +----+---+-----+-------------+----+----+----+
    

    Hope this helps!!