I am trying to parallelize processing files.
I am using this code:
import pandas as pd
import glob
from tqdm import tqdm
from multiprocessing import Process, Pool
SAVE_PATH = './tmp/saved/'
data_path = './tmp/data/'
def process_files(data_path):
for idx_file, file in tqdm(enumerate(glob.glob(data_path + '*.txt'))):
try:
with open(file) as f:
lines = f.readlines()
# create a dataframe
df = pd.DataFrame.from_dict({'title':lines})
df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
except Exception as ex:
print('\nError in file: {}\n'.format(file))
continue
processes = []
for i, input_file in enumerate(data_path):
input_path = data_path[i]
process_files(input_path)
process = Process(target=process_files, args=(input_path))
processes.append(process)
process.start()
print('Waiting for the parallel process...')
process.join()
but it processes the files many times.
I need to keep the loop in the process_files
function.
So, I tried:
pool = Pool(processes=4)
pool.map(process_files, data_path)
pool.close()
but again, it processes the files multiple times.
So, after the comments, I can run the code:
import pandas as pd
import glob
from tqdm import tqdm
from multiprocessing import Pool
SAVE_PATH = './tmp/saved/'
data_path = './tmp/data/'
def process_files(input_data):
file, idx_file = input_data
try:
with open(file) as f:
lines = f.readlines()
df = pd.DataFrame.from_dict({'title':lines})
df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
print('Finished\n')
except Exception as ex:
print('\nError in file: {}\n'.format(file))
file = [f for f in glob.glob(data_path + '*.txt')]
idx_file = [idx for idx in range(len(file))]
input_data = zip(file, idx_file)
workers = 4
pool = Pool(processes=workers)
pool.map(process_files, input_data)
pool.close()
but, to extend further in my case, when I try to append to USER_ID
list:
USER_ID = []
def process_files(input_data):
file, idx_file = input_data
try:
with open(file) as f:
lines = f.readlines()
USER_ID.append(idx_file)
print(USER_ID[idx_file])
# create a dataframe
df = pd.DataFrame.from_dict({'title':lines})
df.to_csv(SAVE_PATH + str(idx_file) + '.csv', index=False)
print('Finished\n')
except Exception as ex:
print('\nError in file: {}\n'.format(file))
I am receiving error in file
.
Is something like this closer to what you're looking for? Edit: removed tqdm which does not have good multiprocessing compatibility, and simplified example with glob.
from time import sleep
import glob
from multiprocessing import Pool
data_path = r'C:\Users\username\python\311\Lib\multiprocessing'
def process_file(idx_file, file):
print(f"{idx_file}, {file}")
sleep(1)
if __name__ == "__main__":
with Pool() as pool:
pool.starmap(process_file, enumerate(glob.glob(data_path+r"\*.py")))
pool.close() #close and join not strictly needed here, but good practice.
pool.join() #leaving the `with Pool()` block with outstanding async tasks
#will terminate the pool before the work is done.