Search code examples
pythonpandasdataframeaccessor

Pandas Class with Pandas .pipe()


@pd.api.extensions.register_dataframe_accessor("data_cleaner")
class DataCleaner:

    def __init__(self, pandas_obj):
        self._obj = pandas_obj

    def multiply(self, col):
        self._obj[col] = self._obj[col] * self._obj[col]
        return self._obj 
    
    def square(self, col):
        self._obj[col] = self._obj[col]**2
        return self._obj 
    
    def add_strings(self, col):
        self._obj[col] = self._obj[col] + self._obj[col]
        return self._obj 

    def process_all(self):
        self._obj.pipe(
            self.multiply(col='A'),
            self.square(col='B')
            self.add_strings(col='C')
        )
    

class DataProcessor(DataCleaner):

    data = [
        [1, 1.5, "AABB"],
        [2, 2.5, "BBCC"],
        [3, 3.5, "CCDD"],
        [4, 4.5, "DDEE"],
        [5, 5.5, "EEFF"],
        [6, 6.5, "FFGG"],
    ]

    def __init__(self):
        self.df = pd.DataFrame(data=DataProcessor.data, columns=['A', 'B', 'C'])

    def get_data(self):
        return self.df
    
    def clean_the_df(self, obj):
        obj = obj.data_cleaner.multiply(col='A')
        obj = obj.data_cleaner.square(col='B')
        obj = obj.data_cleaner.add_strings(col='C')
        return obj
    
    def process_all(self, obj):
        obj = obj.data_cleaner.process_all()


if __name__ == '__main__':
    

    data = DataProcessor().get_data()
    
    # this works
    print(DataProcessor().clean_the_df(data))
    
    # this does not work
    print(DataProcessor().process_all(data))

I want to use pandas .pipe() function with the dataframe accessor to chain methods together. In the DataCleaner class I have a method process_all that contains other cleaning methods inside the class. I want to chain them together and process the dataframe with multiple methods in one go.

It would be nice to keep this chaining method inside the DataCleaner class so all I have to do is call it one time from another Class or file, e.g. process_all inside DataProcessor.

That way I do not have to individually write out each method to process the dataframe one at a time, for example in DataProcessor.clean_the_df().

The problem is that process_all is complaining: TypeError: 'DataFrame' object is not callable

So my question is, how do I use the pandas dataframe accessor, self.obj, with .pipe() to chain together multiple cleaning methods inside one function so that I can call that function from another class and process a dataframe with multiple methods in one go?

Desired output with process_all:

    A      B         C
0   1   2.25  AABBAABB
1   4   6.25  BBCCBBCC
2   9  12.25  CCDDCCDD
3  16  20.25  DDEEDDEE
4  25  30.25  EEFFEEFF
5  36  42.25  FFGGFFGG

Solution

  • The question here is that .pipe expects a function that takes a DataFrame, a Series, or a GroupBy object. The documentation is quite clear with regards to that: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.pipe.html. On top of that, the DataCleaner.process_all function is not implementing .pipe correctly. In order to chain several functions, the expected syntax is:

    >>>(df.pipe(h)
    ...    .pipe(g, arg1=a)
    ...    .pipe(func, arg2=b, arg3=c)
    ... ) 
    

    which is equivalent to

    >>>func(g(h(df), arg1=a), arg2=b, arg3=c) 
    

    In order to combine the data frame accessor with .pipe you need to define static methods within your DataCleaner class that take a DataFrame and a column as arguments. Here is an example that fixes your problem:

    @pd.api.extensions.register_dataframe_accessor("data_cleaner")
    class DataCleaner:
        def __init__(self, pandas_obj):
            self._obj = pandas_obj
    
        @staticmethod
        def multiply(df, col):
            df[col] = df[col] * df[col]
            return df 
        
        @staticmethod
        def square(df, col):
            df[col] = df[col]**2
            return df
        
        @staticmethod
        def add_strings(df, col):
            df[col] = df[col] + df[col]
            return df 
    
        def process_all(self):
            self._obj = (self._obj.pipe(self.multiply, col='A') 
                                  .pipe(self.square, col='B')
                                  .pipe(self.add_strings, col='C'))
            return self._obj 
    
    class DataProcessor(DataCleaner):
        data = [
            [1, 1.5, "AABB"],
            [2, 2.5, "BBCC"],
            [3, 3.5, "CCDD"],
            [4, 4.5, "DDEE"],
            [5, 5.5, "EEFF"],
            [6, 6.5, "FFGG"],
        ]
        def __init__(self):
            self.df = pd.DataFrame(data=DataProcessor.data, columns=['A', 'B', 'C'])
    
        def get_data(self):
            return self.df
        
        def clean_the_df(self, obj):
            obj = obj.data_cleaner.multiply(obj, col='A') # modified to use static method
            obj = obj.data_cleaner.square(obj, col='B')
            obj = obj.data_cleaner.add_strings(obj, col='C')
            return obj
        
        def process_all(self, obj):
            obj = obj.data_cleaner.process_all()
            return obj
    

    Using this code, running this should yield:

    >>>data = data = DataProcessor().get_data()
    >>>print(DataProcessor().process_all(data))
        A      B         C
    0   1   2.25  AABBAABB
    1   4   6.25  BBCCBBCC
    2   9  12.25  CCDDCCDD
    3  16  20.25  DDEEDDEE
    4  25  30.25  EEFFEEFF
    5  36  42.25  FFGGFFGG