I am trying to run a random forest classifier using pyspark ml (spark 2.4.0) with encoding the target labels using OHE. The model trains fine when I feed the labels as integers (string indexer) but fails when i feed a one hot encoded labels using OneHotCodeEstimator. Is this a spark limitation?
#%%
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
ohe = OneHotEncoderEstimator(inputCols=['label'],outputCols=['label_ohe'],dropLast=False)
label_pipeline=Pipeline(stages=[str_indxr,ohe])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)
I get this error:
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-18-f08a05d86e2c> in <module>()
1 if(train_labdata_rf):
----> 2 pipeline_trained,accuracy,test_result_rf = train_test("rf",train_d,test_d)
3 print("Test set accuracy = " + str(accuracy))
4 #pipeline_trained.write().overwrite().save("/projects/projectwbvplatformpc/dev/PS-ET_Pipeline/CDM_Classifier/output/pyspark_classifier/pipelines/random_forest")
5 else:
<ipython-input-4-9709037baa80> in train_test(modelname, train_data, test_data)
11 """
12 pipeline=create_pipeline(modelname)
---> 13 pipeline_fit = pipeline.fit(train_data)
14
15 result = pipeline_fit.transform(test_d)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
77 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
78 if s.startswith('java.lang.IllegalArgumentException: '):
---> 79 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
80 raise
81 return deco
IllegalArgumentException: u'requirement failed: Column label_ohe must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
I am not able to find any suitable resource. Any suggestion would be helpful.
Edit : pyspark does not support a vector as a target label hence only string encoding works.
The problematic code is -
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
The issue is with type of labelCol= label_ohe
, it must be an instance of NumericType
Output Type of OHE is of Vector
ref - spark-git
Use StringIndexer output directly as-
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
label_pipeline=Pipeline(stages=[str_indxr])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)