Search code examples
pythonpython-3.xpandasmultiprocessingpython-multiprocessing

Missing/Lost file out put when using multiprocessing with python and pandas


I have a set of stocks price history, contains 8160 files under folder 'Raw'. I than using pandas to do some adjustments on each of them, than output a new file into another folder 'Cleaned'. Any file failed to process, should be write into a folder called 'Failed'.

All files are csv.

After I run the code below, I found that there are no files under failed. But around 100 files are missing. 'Clean' folder only contains 8060 files.

I checked the difference and find out what are the missing files. I found that all files are in correct format and could be processed normally if I do it the for loop way. So I suspect that maybe I didn't use the multiprocess correctly.

Here are the codes.

def cleanup(file):
    print('Working on ' + file)
    stock = pd.read_csv(raw_folder_path + file)

    try:
        del stock['Unnamed: 0']
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock.to_csv(clean_folder_path + file)
    except:
        print(file + ' Not Successfull')
        stock.to_csv(failed_folder_path + file)

#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)

I hope someone could guild me where I did wrong with this? Thank you!


Solution

  • I've put recommendations into comments of question. The final code should probably look like this:

    Try it online here!

    import multiprocessing, os, datetime
    import pandas as pd
    
    raw_folder_path = './input/'
    clean_folder_path = './clean/'
    failed_folder_path = './failed/'
    
    def cleanup(file):
        print('Working on ' + file)
        stock = pd.read_csv(raw_folder_path + file)
    
        try:
            del stock['Unnamed: 0']
            stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
            stock.to_csv(clean_folder_path + file)
            assert os.path.exists(clean_folder_path + file)
            return True
        except Exception as ex:
            print(file + ' Not Successfull ', ex)
            stock.to_csv(failed_folder_path + file)
            assert os.path.exists(failed_folder_path + file)
            return False
    
    if __name__ == '__main__':
        files_list = os.listdir(raw_folder_path)
        with multiprocessing.Pool() as pool:
            results = pool.map(cleanup, files_list)
        assert all(results), f'{len(results) - sum(results)} wrong results!'
        assert len(results) == len(files_list)