Search code examples
pythonpysparkapache-spark-sqlcategorical-dataapache-spark-ml

Pyspark: how to extract subcolumns and re-transform them to categorical variables


I'm having an issue with a spark dataframe coming from a RandomForestRegressor, which I need to join with another dataframe (the original data).

from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf())
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

Here's some sample data:

columns = ['pays', 'zol', 'group_cont_typ', 'id_periode_gestion', 'target']
vals = [('AE', 'AFRIC', 'DR', 201601, 34.67),
        ('AE', 'AFRIC', 'DR', 201602, 59.38),
        ('AE', 'ASIA', 'RF', 201601, 123.45),
        ('AE', 'ASIA', 'RF', 201602, 186.32)]

df = sqlContext.createDataFrame(vals, columns)
df.show()
+----+-----+--------------+------------------+------+
|pays|  zol|group_cont_typ|id_periode_gestion|target|
+----+-----+--------------+------------------+------+
|  AE|AFRIC|            DR|            201601| 34.67|
|  AE|AFRIC|            DR|            201602| 59.38|
|  AE| ASIA|            RF|            201601|123.45|
|  AE| ASIA|            RF|            201602|186.32|
+----+-----+--------------+------------------+------+

I have transformed 3 categorical variables in numerical ones to use them in prediction:

si_pays = StringIndexer(inputCol='pays', outputCol='pays_encode')
si_zol = StringIndexer(inputCol='zol', outputCol='zol_encode')
si_type = StringIndexer(inputCol='group_cont_typ', outputCol='group_cont_typ_encode')
df = si_pays.fit(df).transform(df)
df = si_zol.fit(df).transform(df)
df = si_type.fit(df).transform(df)

Then I've changed the format of the dataframe to fit the required format for RandomForestRegressor:

input_cols = ['pays_encode', 'zol_encode', 'group_cont_typ_encode', 'id_periode_gestion']
df = df.rdd.map(lambda x: (x['target'], 
                               Vectors.dense([x[col] for col in input_cols]))) \
        .toDF(["label", "features"]) \
        .select([F.col('label').cast(DoubleType()).alias('label'), 'features'])

This allows me to run the predictions (I normally train and test on different subsets, but for simplicity's sake, I do it all on the same dataframe):

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=120, maxDepth=8, maxBins=64)
model = rf.fit(df)
forecasts = model.transform(df)
forecasts.show()
+------+--------------------+-----------------+
| label|            features|       prediction|
+------+--------------------+-----------------+
| 34.67|[57.0,0.0,0.0,201...|38.58532881795905|
| 59.38|[57.0,0.0,0.0,201...|69.21916671695188|
|123.45|[57.0,8.0,1.0,201...|94.17987290587061|
|186.32|[57.0,8.0,1.0,201...| 91.3936760453811|
+------+--------------------+-----------------+

Now that we got to this point, here's my problem: I need to join the results with the original data to present them, and the way to join them is on the columns that were transformed.


So what I do is I extract the sub-columns of features:

from pyspark.sql.functions import udf, col

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

forecasts = forecasts.withColumn("feature", to_array(col("features"))).select(["prediction", "label"] +
                                                                                  [col("feature")[i] for i in range(4)])

I then want to get the variables back to their original categorical values with IndexToString, but that's what doesn't work.

back_to_pays = IndexToString().setInputCol("feature[0]").setOutputCol("pays")
back_to_zol = IndexToString().setInputCol("feature[1]").setOutputCol("zol")
back_to_type = IndexToString().setInputCol("feature[2]").setOutputCol("group_cont_typ")
forecasts = back_to_pays.transform(forecasts)
forecasts = back_to_zol.transform(forecasts)
forecasts = back_to_type.transform(forecasts)
forecasts.show()


Traceback (most recent call last):
"/runner.py", line 109, in _run_spark
    forecasts = back_to_pays.transform(forecasts)
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 114, in transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 149, in _transform
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/2.6.2.0-205/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/hdp/2.6.2.0-205/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o354.transform.
: java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.attribute.NominalAttribute
    at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:292)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

How would I go about the re-transformation to categorical data while extracting the sub-columns from the vector column ?


Solution

  • It would have been easier to join if there was id column but you can create on using row number and then join them using row number since the positions of rows don't change during transforms.

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
    
    w = Window().orderBy()
    
    df =  df.withColumn("rowid", rowNumber().over(w))
    forecasts =  forecasts.withColumn("rowid", rowNumber().over(w))
    
    mergedDF = df.join(forecasts, "rowid").drop("rowid")
    mergedDF.show()