I have defined a custom tansformer that takes a pandas dataframe, apply a function on only one column and leaves all the remaining columns untouched. The transformer is working fine during testing, but not when I include it as part of a Pipeline.
Here's the transformer:
import re
from sklearn.base import BaseEstimator, TransformerMixin
class SynopsisCleaner(BaseEstimator, TransformerMixin):
def __init__(self):
return None
def fit(self, X, y=None, **fit_params):
# nothing to learn from data.
return self
def clean_text(self, text):
text = text.lower()
text = re.sub(r'@[a-zA-Z0-9_]+', '', text)
text = re.sub(r'https?://[A-Za-z0-9./]+', '', text)
text = re.sub(r'www.[^ ]+', '', text)
text = re.sub(r'[a-zA-Z0-9]*www[a-zA-Z0-9]*com[a-zA-Z0-9]*', '', text)
text = re.sub(r'[^a-zA-Z]', ' ', text)
text = [token for token in text.split() if len(token) > 2]
text = ' '.join(text)
return text
def transform(self, X, y=None, **fit_params):
for i in range(X.shape[0]):
X[i] = self.clean_text(X[i])
return X
When I test it manually like this, it is working just as expected.
train_synopsis = SynopsisCleaner().transform(train_data['Synopsis'])
But, when I include it as a part of sklearn pipeline:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
# part 1: defining a column transformer that learns on only one column and transforms it
synopsis_clean_col_tran = ColumnTransformer(transformers=[('synopsis_clean_col_tran', SynopsisCleaner(), ['Synopsis'])],
# set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
remainder='passthrough')
# make a pipeline now with all the steps
pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
pipe_1.fit(train_data)
I get KeyError, like shown below:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
2890 try:
-> 2891 return self._engine.get_loc(casted_key)
2892 except KeyError as err:
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 0
The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last)
16 frames
<ipython-input-10-3396fa5d6092> in <module>()
6 # make a pipeline now with all the steps
7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
----> 8 pipe_1.fit(train_data)
/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params)
352 self._log_message(len(self.steps) - 1)):
353 if self._final_estimator != 'passthrough':
--> 354 self._final_estimator.fit(Xt, y, **fit_params)
355 return self
356
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y)
482 # we use fit_transform to make sure to set sparse_output_ (for which we
483 # need the transformed data) to have consistent output type in predict
--> 484 self.fit_transform(X, y=y)
485 return self
486
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
516 self._validate_remainder(X)
517
--> 518 result = self._fit_transform(X, y, _fit_transform_one)
519
520 if not result:
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _fit_transform(self, X, y, func, fitted)
455 message=self._log_message(name, idx, len(transformers)))
456 for idx, (name, trans, column, weight) in enumerate(
--> 457 self._iter(fitted=fitted, replace_strings=True), 1))
458 except ValueError as e:
459 if "Expected 2D array, got 1D array instead" in str(e):
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable)
1027 # remaining jobs.
1028 self._iterating = False
-> 1029 if self.dispatch_one_batch(iterator):
1030 self._iterating = self._original_iterator is not None
1031
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
845 return False
846 else:
--> 847 self._dispatch(tasks)
848 return True
849
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in _dispatch(self, batch)
763 with self._lock:
764 job_idx = len(self._jobs)
--> 765 job = self._backend.apply_async(batch, callback=cb)
766 # A job can complete so quickly than its callback is
767 # called before we get here, causing self._jobs to
/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
206 def apply_async(self, func, callback=None):
207 """Schedule a func to be run"""
--> 208 result = ImmediateResult(func)
209 if callback:
210 callback(result)
/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in __init__(self, batch)
570 # Don't delay the application, to avoid keeping the input
571 # arguments in memory
--> 572 self.results = batch()
573
574 def get(self):
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self)
251 with parallel_backend(self._backend, n_jobs=self._n_jobs):
252 return [func(*args, **kwargs)
--> 253 for func, args, kwargs in self.items]
254
255 def __reduce__(self):
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in <listcomp>(.0)
251 with parallel_backend(self._backend, n_jobs=self._n_jobs):
252 return [func(*args, **kwargs)
--> 253 for func, args, kwargs in self.items]
254
255 def __reduce__(self):
/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params)
726 with _print_elapsed_time(message_clsname, message):
727 if hasattr(transformer, 'fit_transform'):
--> 728 res = transformer.fit_transform(X, y, **fit_params)
729 else:
730 res = transformer.fit(X, y, **fit_params).transform(X)
/usr/local/lib/python3.6/dist-packages/sklearn/base.py in fit_transform(self, X, y, **fit_params)
569 if y is None:
570 # fit method of arity 1 (unsupervised transformation)
--> 571 return self.fit(X, **fit_params).transform(X)
572 else:
573 # fit method of arity 2 (supervised transformation)
<ipython-input-6-004ee595d544> in transform(self, X, y, **fit_params)
20 def transform(self, X, y=None, **fit_params):
21 for i in range(X.shape[0]):
---> 22 X[i] = self.clean_text(X[i])
23 return X
/usr/local/lib/python3.6/dist-packages/pandas/core/frame.py in __getitem__(self, key)
2900 if self.columns.nlevels > 1:
2901 return self._getitem_multilevel(key)
-> 2902 indexer = self.columns.get_loc(key)
2903 if is_integer(indexer):
2904 indexer = [indexer]
/usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
2891 return self._engine.get_loc(casted_key)
2892 except KeyError as err:
-> 2893 raise KeyError(key) from err
2894
2895 if tolerance is not None:
KeyError: 0
What am I doing wrong here?
EDIT 1: without brackets and the column name specified as string, this is the error I see:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-11-bdd42b09e2af> in <module>()
6 # make a pipeline now with all the steps
7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)])
----> 8 pipe_1.fit(train_data)
3 frames
/usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params)
352 self._log_message(len(self.steps) - 1)):
353 if self._final_estimator != 'passthrough':
--> 354 self._final_estimator.fit(Xt, y, **fit_params)
355 return self
356
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y)
482 # we use fit_transform to make sure to set sparse_output_ (for which we
483 # need the transformed data) to have consistent output type in predict
--> 484 self.fit_transform(X, y=y)
485 return self
486
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y)
536
537 self._update_fitted_transformers(transformers)
--> 538 self._validate_output(Xs)
539
540 return self._hstack(list(Xs))
/usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _validate_output(self, result)
400 raise ValueError(
401 "The output of the '{0}' transformer should be 2D (scipy "
--> 402 "matrix, array, or pandas DataFrame).".format(name))
403
404 def _validate_features(self, n_features, feature_names):
ValueError: The output of the 'synopsis_clean_col_tran' transformer should be 2D (scipy matrix, array, or pandas DataFrame).
In your manual test, you are passing the Series train_data['Synopsis']
, but the column transformer is passing the Frame train_data[['Synopsis']]
. (So, to clarify the error: X[i]
is trying to get the column named 0, which indeed does not exist.) You should be able to fix this as easily as dropping the brackets around 'Synopsis'
in the column specification of the transformer. From the docs:
...A scalar string or int should be used where transformer expects X to be a 1d array-like (vector), otherwise a 2d array will be passed to the transformer. ...
That is,
synopsis_clean_col_tran = ColumnTransformer(
transformers=[('synopsis_clean_col_tran', SynopsisCleaner(), 'Synopsis')],
# set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
remainder='passthrough',
)
Ah, but then ColumnTransformer
complains that the output of your transformer is one-dimensional; that's unfortunate. I think the cleanest thing then is to switch your transform
to expect both input and output as 2D. If you'll only ever need dataframes as input (no other sklearn transformers converting to numpy arrays), then this can be relatively simple using a FunctionTransformer
instead of your custom class.
def clean_text_frame(X):
return X.applymap(clean_text) # the function "clean_text" currently in your class.
synopsis_clean_col_tran = ColumnTransformer(
transformers=[('synopsis_clean_col_tran', FunctionTransformer(clean_text_frame), ['Synopsis'])],
# set remainder to passthrough to pass along all the un-specified columns untouched to the next steps
remainder='passthrough',
)