Search code examples
pythonpandaspicklepython-multiprocessing

Python Pandas Multiprocessing Pickle Error


I have multiple .csv files in a folder. Each .csv file has trading data from a list of stocks. I wanted to take a specific section of data (in this case from the 'BABA' ticker) from each .csv, and then combine multiple days worth of sections. Due to the global interpreter lock it takes ~15 minutes to do this using a standard For Loop on 150 .csv files.

GOAL: speed up the For Loop using multiprocessing

PROBLEM: when using multiprocessing I receive the error: AttributeError: Can't pickle local object 'main.locals.compile' Full traceback of error at bottom.

The below code is slow but works using a for loop:

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using FOR LOOP ===

    for file in file_list:
        compile(file)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

When I change the for loop to this code for multiprocessing I receive the error.

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using MULTIPROCESSING ===

    pool = Pool(processes = (multiprocessing.cpu_count()-1))
    results = pool.map(compile, file_list)
    pool.close()
    pool.join()
    results_df = pd.concat(results)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

Error Traceback: Traceback (most recent call last): File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 54, in main() File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 41, in main results = pool.map(compile, file_list) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 771, in get raise self._value File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'main.locals.compile'


Solution

  • I'm not sure if you actually have your imports and other functions nested inside def main() - or if that's a code formatting issue?

    You need to format your code to something like:

    import glob
    import pandas as pd
    import numpy as np
    from   multiprocessing import Pool
    
    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] 
    
        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)
    
        return df
    
    def main():
        path = '/Users/DataFiles'
        file_list = glob.glob(path + '/*.csv')
    
        with Pool() as pool:
            results_df = pd.concat(pool.map(compile, file_list))
    
        print(results_df)
        results_df.to_csv('scholes expanded.csv', index = False)
    
    if __name__ == '__main__': 
        main()