Search code examples
pythonthreadpool

Python troubles saving several pool processed files


I need to process some files in praralel. I'm using pool, but i have troubles saving the pool processed files. Here is the code:

... All imports...

def extract(text_lines):

    line_tr01 = []
    line_tr02 = []
    line_tr03 = []
    line_tr03 = []
    for line in text_lines:
        treatment01 = treatment_a(line, args)
        line_tr01.append(treatment01)
        treatment02 = treatment_b(line, args)
        line_tr02.append(treatment02)
        treatment03 = treatment_c(line, args)
        line_tr03.append(treatment03)
        treatment04 = treatment_d(line, args)
        line_tr04.append(treatment04)


for file in folder:
    text_lines = read_file_into_list(file_path)
    chunk_size=len(text_lines)/6
    divided=[]

    divided.append(text_lines[0:chunk_size])
    divided.append(text_lines[chunk_size:2*chunk_size])
    divided.append(text_lines[2*chunk_size:3*chunk_size])
    divided.append(text_lines[3*chunk_size:4*chunk_size])
    divided.append(text_lines[4*chunk_size:5*chunk_size])
    divided.append(text_lines[5*chunk_size:6*chunk_size])

    lines=[]
    p = Pool(6)
    lines.extend(p.map(extract(text_lines),divided))
    p.close()
    p.join()
    p.terminate()

    line_tr01=lines[0]
    with open(pkl_filename, 'wb') as f:
        pickle.dump(line_tr01, f)
    line_tr02=lines[1]
    with open(pkl_filename, 'wb') as f:
        pickle.dump(line_tr02, f)
    line_tr03=lines[2]
    with open(pkl_filename, 'wb') as f:
        pickle.dump(line_tr03, f)
    line_tr04=lines[3]
    with open(pkl_filename, 'wb') as f:
        pickle.dump(line_tr04, f)

Any light on how can I stop overwriting the files Any help will be welcome. Thanks in advance


Solution

  • So the problem is that when you break stuff into pools, you no longer have the common global namespace that you're currently (ab)using. So let's just rewrite this to pass things properly.

    def extract(text_lines):
        treatments = dict(tr01=[], tr02=[], tr03=[], tr04=[])
        for line in text_lines:
            treatments['tr01'].append(treatment_a(line, args))
            treatments['tr02'].append(treatment_b(line, args))
            treatments['tr03'].append(treatment_c(line, args))
            treatments['tr04'].append(treatment_d(line, args))
        return treatments
    
    def line_gen(lines, chunk_size=1):
        for i in range(0, len(lines), chunk_size):
            yield lines[i:i + chunk_size]
    
    for file in folder:
        text_lines = read_file_into_list(file_path)
        treatments = dict(tr01=[], tr02=[], tr03=[], tr04=[])
        p = Pool(6)
        for treat_data in p.imap(extract, line_gen(text_lines, chunk_size=int(len(text_lines)/6))):
            for tr, data in treat_data.items():
                treatments[tr].extend(data)
    
        # Do something with all your data in the treatments dict
    

    This should pile up all of the data into a dict called treatments, because it returns the data from your child process that's running extract, and then you can write out the data in whatever way that you'd like.