Search code examples
pythonparallel-processingmultiprocessing

parallelize file processing (in loop)


I am trying to parallelize processing files.

I am using this code:

import pandas as pd
import glob
from tqdm import tqdm
from multiprocessing import Process, Pool

SAVE_PATH = './tmp/saved/'
data_path = './tmp/data/'

def process_files(data_path):
    for idx_file, file in tqdm(enumerate(glob.glob(data_path + '*.txt'))):
        try:
            with open(file) as f:
                lines = f.readlines()
            # create a dataframe
            df = pd.DataFrame.from_dict({'title':lines})
            df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
            
        except Exception as ex:
            print('\nError in file: {}\n'.format(file))
            continue
        
processes = []

for i, input_file in enumerate(data_path):
    input_path = data_path[i]
    process_files(input_path)
    process = Process(target=process_files, args=(input_path))
    processes.append(process)
    process.start()
    print('Waiting for the parallel process...')
    process.join()

but it processes the files many times.

I need to keep the loop in the process_files function.

So, I tried:

pool = Pool(processes=4)
pool.map(process_files, data_path)
pool.close()

but again, it processes the files multiple times.

So, after the comments, I can run the code:

import pandas as pd
import glob
from tqdm import tqdm
from multiprocessing import Pool

SAVE_PATH = './tmp/saved/'
data_path = './tmp/data/'

def process_files(input_data):
    file, idx_file = input_data 
    try:
        with open(file) as f:
            lines = f.readlines()

        df = pd.DataFrame.from_dict({'title':lines})
        df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
        print('Finished\n')
    except Exception as ex:
        print('\nError in file: {}\n'.format(file))
        

file = [f for f in glob.glob(data_path + '*.txt')]
idx_file = [idx for idx in range(len(file))]

input_data = zip(file, idx_file)

workers = 4
pool = Pool(processes=workers)
pool.map(process_files, input_data)
pool.close()

but, to extend further in my case, when I try to append to USER_ID list:

USER_ID = []
def process_files(input_data):
    file, idx_file = input_data 
    try:
        with open(file) as f:
            lines = f.readlines()

        USER_ID.append(idx_file)
        print(USER_ID[idx_file])
        # create a dataframe
        df = pd.DataFrame.from_dict({'title':lines})
        df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
        print('Finished\n')
    except Exception as ex:
        print('\nError in file: {}\n'.format(file))

I am receiving error in file.


Solution

  • Is something like this closer to what you're looking for? Edit: removed tqdm which does not have good multiprocessing compatibility, and simplified example with glob.

    from time import sleep
    import glob
    from multiprocessing import Pool
    
    data_path = r'C:\Users\username\python\311\Lib\multiprocessing'
    
    def process_file(idx_file, file):
        print(f"{idx_file}, {file}")
        sleep(1)
    
    if __name__ == "__main__":
        with Pool() as pool:
            pool.starmap(process_file, enumerate(glob.glob(data_path+r"\*.py")))
            pool.close() #close and join not strictly needed here, but good practice.
            pool.join() #leaving the `with Pool()` block with outstanding async tasks
                        #will terminate the pool before the work is done.