Search code examples
pandasparallel-processingjoblib

How to apply a class method to chunks of a pandas dataframe in parallel?


I have a dataframe df that I would like to break into smaller chunks column wise so that I can apply a class method to each chunk. The method seems to work fine, but then after around 10 iterations I keep getting CustomizablePickler(buffer, self._reducers).dump(obj), AttributeError: Can't pickle local object 'delayed.<locals>.delayed_function', so I am not sure what's wrong.

What I have so far

from typing import Tuple

import pandas as pd
import numpy as np


class MyClass:
    def __init__(self, df: pd.DataFrame):
        self.df = df
    ...
    def run(self) -> Tuple[np.ndarray, np.ndarray]:
        <operate on self.df>


if __name__ == "__main__":
    from joblib import Parallel, delayed
    df = pd.read_csv("data.csv")

    n_cpu = 6
    chunk = df.shape[1] // n_cpu
    # create chunk column indicies
    lower_idx = np.array([0, chunk, 2 * chunk, 3 * chunk, 4 * chunk, 5 * chunk])
    upper_idx = lower_idx + chunk
    upper_idx[-1] = df.shape[1]
    res = Parallel(n_jobs=n_cpu, backend="multiprocessing")(
        delayed(MyClass(df.iloc[:, i: j]).run()) for i, j in zip(lower_idx, upper_idx)
    )

The dataframe df is small and can easily fit in memory (only 54 Mb), but the operations in run() take a long time, so I would like to parallelise these operations over a subset of columns of df at a time.

Am I doing something wrong? Is there an easier to way to do this? I don't need to use the joblib package, but I would like to leave MyClass unchanged if possible.


Solution

  • Use a proxy function to run your class instance:

    import pandas as pd
    import numpy as np
    import multiprocessing as mp
    from typing import Tuple
    
    class MyClass:
        def __init__(self, df: pd.DataFrame):
            self.df = df
    
        def run(self) -> Tuple[np.ndarray, np.ndarray]:
            return (df.min(1).values, df.max(1).values)
    
    def proxy(df):
        c = MyClass(df)
        return c.run()
    
    if __name__ == '__main__':
        np.random.seed(2022)
        df = pd.DataFrame(np.random.randint(1, 1000, (10, 100)))
    
        n_cpu = 6
        chunks = np.array_split(df, n_cpu, axis=1)
    
        with mp.Pool(n_cpu) as pool:
            data = pool.map(proxy, chunks)
    

    Output:

    >>> data
    [(array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
     (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
     (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
     (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
     (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995])),
     (array([12, 15,  1,  4, 13,  3,  9,  2, 41, 16]),
      array([998, 981, 986, 991, 969, 995, 976, 996, 998, 995]))]
    

    Update

    Can you replace the with block by:

        p = mp.get_context('fork').Pool(8)
        data = p.map(proxy, chunks)