PySpark pipeline is quite new to me .I am trying to create the stages in pipeline by passing below list :
pipeline = Pipeline().setStages([indexer,assembler,dtc_model])
where I am Applying feature indexing on multiple columns :
cat_col = ['Gender','Habit','Mode']
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]
On Running the fit on the pipeline I am getting below error :
model_pipeline = pipeline.fit(train_df)
How we can pass the list to the stage or any work around to achieve this or better way to do this?
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<command-3999694668013877> in <module>
----> 1 model_pipeline = pipeline.fit(train_df)
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
95 if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
96 raise TypeError(
---> 97 "Cannot recognize a pipeline stage of type %s." % type(stage))
98 indexOfLastEstimator = -1
99 for i, stage in enumerate(stages):
TypeError: Cannot recognize a pipeline stage of type <class 'list'>.```
Try below-
cat_col = ['Gender','Habit','Mode']
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]
assembler = VectorAssembler...
dtc_model = DecisionTreeClassifier...
# Create pipeline using transformers and estimators
stages = indexer
stages.append(assembler)
stages.append(dtc_model)
pipeline = Pipeline().setStages(stages)
model_pipeline = pipeline.fit(train_df)