I have multiple .csv files in a folder. Each .csv file has trading data from a list of stocks. I wanted to take a specific section of data (in this case from the 'BABA' ticker) from each .csv, and then combine multiple days worth of sections. Due to the global interpreter lock it takes ~15 minutes to do this using a standard For Loop on 150 .csv files.
GOAL: speed up the For Loop using multiprocessing
PROBLEM: when using multiprocessing I receive the error: AttributeError: Can't pickle local object 'main.locals.compile' Full traceback of error at bottom.
The below code is slow but works using a for loop:
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using FOR LOOP ===
for file in file_list:
compile(file)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
When I change the for loop to this code for multiprocessing I receive the error.
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using MULTIPROCESSING ===
pool = Pool(processes = (multiprocessing.cpu_count()-1))
results = pool.map(compile, file_list)
pool.close()
pool.join()
results_df = pd.concat(results)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
Error Traceback: Traceback (most recent call last): File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 54, in main() File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 41, in main results = pool.map(compile, file_list) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 771, in get raise self._value File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'main.locals.compile'
I'm not sure if you actually have your imports and other functions nested inside def main()
- or if that's a code formatting issue?
You need to format your code to something like:
import glob
import pandas as pd
import numpy as np
from multiprocessing import Pool
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA']
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
return df
def main():
path = '/Users/DataFiles'
file_list = glob.glob(path + '/*.csv')
with Pool() as pool:
results_df = pd.concat(pool.map(compile, file_list))
print(results_df)
results_df.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()