Search code examples
pysparkapache-spark-mllibapache-spark-ml

Not able to pass StringIndexer as list to the model pipeline stage


PySpark pipeline is quite new to me .I am trying to create the stages in pipeline by passing below list :

pipeline = Pipeline().setStages([indexer,assembler,dtc_model])

where I am Applying feature indexing on multiple columns :

cat_col = ['Gender','Habit','Mode']

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]

On Running the fit on the pipeline I am getting below error :

model_pipeline = pipeline.fit(train_df)

How we can pass the list to the stage or any work around to achieve this or better way to do this?

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<command-3999694668013877> in <module>
----> 1 model_pipeline = pipeline.fit(train_df)

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
     95             if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
     96                 raise TypeError(
---> 97                     "Cannot recognize a pipeline stage of type %s." % type(stage))
     98         indexOfLastEstimator = -1
     99         for i, stage in enumerate(stages):

TypeError: Cannot recognize a pipeline stage of type <class 'list'>.```


Solution

  • Try below-

    cat_col = ['Gender','Habit','Mode']
    indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]
    
    assembler = VectorAssembler...
    dtc_model = DecisionTreeClassifier...
    
    # Create pipeline using transformers and estimators
    stages = indexer
    stages.append(assembler)
    stages.append(dtc_model)
    pipeline = Pipeline().setStages(stages)
    
    model_pipeline = pipeline.fit(train_df)