Search code examples
scikit-learnpipelinesklearn-pandas

can't apply sklearn.compose.ColumnTransformer on only one column of pandas dataframe


I have defined a custom tansformer that takes a pandas dataframe, apply a function on only one column and leaves all the remaining columns untouched. The transformer is working fine during testing, but not when I include it as part of a Pipeline.

Here's the transformer:

import re
from sklearn.base import BaseEstimator, TransformerMixin

class SynopsisCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        return None
    
    def fit(self, X, y=None, **fit_params):
        # nothing to learn from data.
        return self
    
    def clean_text(self, text):
        text = text.lower()
        text = re.sub(r'@[a-zA-Z0-9_]+', '', text)   
        text = re.sub(r'https?://[A-Za-z0-9./]+', '', text)   
        text = re.sub(r'www.[^ ]+', '', text)  
        text = re.sub(r'[a-zA-Z0-9]*www[a-zA-Z0-9]*com[a-zA-Z0-9]*', '', text)  
        text = re.sub(r'[^a-zA-Z]', ' ', text)   
        text = [token for token in text.split() if len(token) > 2]
        text = ' '.join(text)
        return text
    
    def transform(self, X, y=None, **fit_params):
        for i in range(X.shape[0]):
            X[i] = self.clean_text(X[i])
        return X

When I test it manually like this, it is working just as expected.

train_synopsis = SynopsisCleaner().transform(train_data['Synopsis'])

But, when I include it as a part of sklearn pipeline:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# part 1: defining a column transformer that learns on only one column and transforms it
synopsis_clean_col_tran = ColumnTransformer(transformers=[('synopsis_clean_col_tran', SynopsisCleaner(), ['Synopsis'])],
                                            # set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
                                            remainder='passthrough')

# make a pipeline now with all the steps
pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
pipe_1.fit(train_data)

I get KeyError, like shown below:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
   2890             try:
-> 2891                 return self._engine.get_loc(casted_key)
   2892             except KeyError as err:

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

KeyError: 0

The above exception was the direct cause of the following exception:

KeyError                                  Traceback (most recent call last)
16 frames
<ipython-input-10-3396fa5d6092> in <module>()
      6 # make a pipeline now with all the steps
      7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
----> 8 pipe_1.fit(train_data)

/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params)
    352                                  self._log_message(len(self.steps) - 1)):
    353             if self._final_estimator != 'passthrough':
--> 354                 self._final_estimator.fit(Xt, y, **fit_params)
    355         return self
    356 

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y)
    482         # we use fit_transform to make sure to set sparse_output_ (for which we
    483         # need the transformed data) to have consistent output type in predict
--> 484         self.fit_transform(X, y=y)
    485         return self
    486 

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
    516         self._validate_remainder(X)
    517 
--> 518         result = self._fit_transform(X, y, _fit_transform_one)
    519 
    520         if not result:

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _fit_transform(self, X, y, func, fitted)
    455                     message=self._log_message(name, idx, len(transformers)))
    456                 for idx, (name, trans, column, weight) in enumerate(
--> 457                         self._iter(fitted=fitted, replace_strings=True), 1))
    458         except ValueError as e:
    459             if "Expected 2D array, got 1D array instead" in str(e):

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable)
   1027             # remaining jobs.
   1028             self._iterating = False
-> 1029             if self.dispatch_one_batch(iterator):
   1030                 self._iterating = self._original_iterator is not None
   1031 

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
    845                 return False
    846             else:
--> 847                 self._dispatch(tasks)
    848                 return True
    849 

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in _dispatch(self, batch)
    763         with self._lock:
    764             job_idx = len(self._jobs)
--> 765             job = self._backend.apply_async(batch, callback=cb)
    766             # A job can complete so quickly than its callback is
    767             # called before we get here, causing self._jobs to

/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
    206     def apply_async(self, func, callback=None):
    207         """Schedule a func to be run"""
--> 208         result = ImmediateResult(func)
    209         if callback:
    210             callback(result)

/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in __init__(self, batch)
    570         # Don't delay the application, to avoid keeping the input
    571         # arguments in memory
--> 572         self.results = batch()
    573 
    574     def get(self):

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self)
    251         with parallel_backend(self._backend, n_jobs=self._n_jobs):
    252             return [func(*args, **kwargs)
--> 253                     for func, args, kwargs in self.items]
    254 
    255     def __reduce__(self):

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in <listcomp>(.0)
    251         with parallel_backend(self._backend, n_jobs=self._n_jobs):
    252             return [func(*args, **kwargs)
--> 253                     for func, args, kwargs in self.items]
    254 
    255     def __reduce__(self):

/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params)
    726     with _print_elapsed_time(message_clsname, message):
    727         if hasattr(transformer, 'fit_transform'):
--> 728             res = transformer.fit_transform(X, y, **fit_params)
    729         else:
    730             res = transformer.fit(X, y, **fit_params).transform(X)

/usr/local/lib/python3.6/dist-packages/sklearn/base.py in fit_transform(self, X, y, **fit_params)
    569         if y is None:
    570             # fit method of arity 1 (unsupervised transformation)
--> 571             return self.fit(X, **fit_params).transform(X)
    572         else:
    573             # fit method of arity 2 (supervised transformation)

<ipython-input-6-004ee595d544> in transform(self, X, y, **fit_params)
     20     def transform(self, X, y=None, **fit_params):
     21         for i in range(X.shape[0]):
---> 22             X[i] = self.clean_text(X[i])
     23         return X

/usr/local/lib/python3.6/dist-packages/pandas/core/frame.py in __getitem__(self, key)
   2900             if self.columns.nlevels > 1:
   2901                 return self._getitem_multilevel(key)
-> 2902             indexer = self.columns.get_loc(key)
   2903             if is_integer(indexer):
   2904                 indexer = [indexer]

/usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
   2891                 return self._engine.get_loc(casted_key)
   2892             except KeyError as err:
-> 2893                 raise KeyError(key) from err
   2894 
   2895         if tolerance is not None:

KeyError: 0

What am I doing wrong here?

EDIT 1: without brackets and the column name specified as string, this is the error I see:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-11-bdd42b09e2af> in <module>()
      6 # make a pipeline now with all the steps
      7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
----> 8 pipe_1.fit(train_data)

3 frames
/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params)
    352                                  self._log_message(len(self.steps) - 1)):
    353             if self._final_estimator != 'passthrough':
--> 354                 self._final_estimator.fit(Xt, y, **fit_params)
    355         return self
    356 

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y)
    482         # we use fit_transform to make sure to set sparse_output_ (for which we
    483         # need the transformed data) to have consistent output type in predict
--> 484         self.fit_transform(X, y=y)
    485         return self
    486 

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
    536 
    537         self._update_fitted_transformers(transformers)
--> 538         self._validate_output(Xs)
    539 
    540         return self._hstack(list(Xs))

/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _validate_output(self, result)
    400                 raise ValueError(
    401                     "The output of the '{0}' transformer should be 2D (scipy "
--> 402                     "matrix, array, or pandas DataFrame).".format(name))
    403 
    404     def _validate_features(self, n_features, feature_names):

ValueError: The output of the 'synopsis_clean_col_tran' transformer should be 2D (scipy matrix, array, or pandas DataFrame).

Solution

  • In your manual test, you are passing the Series train_data['Synopsis'], but the column transformer is passing the Frame train_data[['Synopsis']]. (So, to clarify the error: X[i] is trying to get the column named 0, which indeed does not exist.) You should be able to fix this as easily as dropping the brackets around 'Synopsis' in the column specification of the transformer. From the docs:

    ...A scalar string or int should be used where transformer expects X to be a 1d array-like (vector), otherwise a 2d array will be passed to the transformer. ...

    That is,

    synopsis_clean_col_tran = ColumnTransformer(
        transformers=[('synopsis_clean_col_tran', SynopsisCleaner(), 'Synopsis')],
        # set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
        remainder='passthrough',
    )
    

    Ah, but then ColumnTransformer complains that the output of your transformer is one-dimensional; that's unfortunate. I think the cleanest thing then is to switch your transform to expect both input and output as 2D. If you'll only ever need dataframes as input (no other sklearn transformers converting to numpy arrays), then this can be relatively simple using a FunctionTransformer instead of your custom class.

    def clean_text_frame(X):
        return X.applymap(clean_text)  # the function "clean_text" currently in your class.
    
    synopsis_clean_col_tran = ColumnTransformer(
        transformers=[('synopsis_clean_col_tran', FunctionTransformer(clean_text_frame), ['Synopsis'])],
        # set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
        remainder='passthrough',
    )