Search code examples
pythonwindowsiteratorpython-multiprocessinglarge-files

Reading and processing large text file in batches using islice and multiprocessing


code doesn't return anything, it keeps running forever. Please help with the code snippet. FYI: I am using multiprocessing for the first time.

I have low local memory, hence extracting data from a zip file. My idea is to read n lines at a time using islice and process them using process_logBatch().

Running this code on windows machine - Jupyter Notebook.

import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd  # Unused.

def process_logBatch(next_n_lines):
    l = [random.randint(0,100) for i in range(5)]
    print(l)
    return l

def collect_results(result):
    results.extend(result)

pool = mp.Pool(processes=(mp.cpu_count()-1))

results = []

with zipfile.ZipFile('log.zip', 'r') as z:
    with z.open('log.txt') as f:

        while True:
            print(f.closed)
            next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]

            if not next_n_lines:
                break

            try:
                pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
            except Exception as e:
                print(e)

            if counter == 2:
                break
        pool.close()
        pool.join()

print(results)


Solution

  • There's a couple of problems. One is on Windows you need an if __name__ == '__main__': statement to protect the main module as shown and discussed in the section titled "Safe importing of main module" in the multiprocssing module's documentation.

    However, the second thing isn't so easily solved. Each process runs in its own memory space, so they don't all have the same results list. To avoid that I switched to using Pool.map_async() and collect the results when all the subprocesses have ended.

    Here's a way I think it will work (based on your sample code):

    import multiprocessing as mp
    import zipfile
    from itertools import islice
    import time
    #import pandas as pd  # Unused.
    import random  # Added.
    
    def process_logBatch(next_n_lines):
        l = [random.randint(0,100) for i in range(5)]
        print(l)
        return l
    
    if __name__ == '__main__':
    
    # Not longer needed.
    #    def collect_results(result):
    #        results.extend(result)
    
        pool = mp.Pool(processes=(mp.cpu_count()-1))
    
        with zipfile.ZipFile('log.zip', 'r') as z:
            with z.open('log.txt') as f:
    
                counter = 0  # Added to avoid NameError because undefined.
    
                while True:
                    next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
    
                    if not next_n_lines:
                        break
    
                    try:
                        results = pool.map_async(process_logBatch, next_n_lines)
                    except Exception as e:
                        print(e)
    
                    if counter == 2:
                        break
    
                pool.close()
                pool.join()
    
        print(results.get())