I have a set of stocks price history, contains 8160 files under folder 'Raw'. I than using pandas to do some adjustments on each of them, than output a new file into another folder 'Cleaned'. Any file failed to process, should be write into a folder called 'Failed'.
All files are csv.
After I run the code below, I found that there are no files under failed. But around 100 files are missing. 'Clean' folder only contains 8060 files.
I checked the difference and find out what are the missing files. I found that all files are in correct format and could be processed normally if I do it the for loop way. So I suspect that maybe I didn't use the multiprocess correctly.
Here are the codes.
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
except:
print(file + ' Not Successfull')
stock.to_csv(failed_folder_path + file)
#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)
I hope someone could guild me where I did wrong with this? Thank you!
I've put recommendations into comments of question. The final code should probably look like this:
import multiprocessing, os, datetime
import pandas as pd
raw_folder_path = './input/'
clean_folder_path = './clean/'
failed_folder_path = './failed/'
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
assert os.path.exists(clean_folder_path + file)
return True
except Exception as ex:
print(file + ' Not Successfull ', ex)
stock.to_csv(failed_folder_path + file)
assert os.path.exists(failed_folder_path + file)
return False
if __name__ == '__main__':
files_list = os.listdir(raw_folder_path)
with multiprocessing.Pool() as pool:
results = pool.map(cleanup, files_list)
assert all(results), f'{len(results) - sum(results)} wrong results!'
assert len(results) == len(files_list)