Search code examples
pythonqueuepython-multiprocessing

Processes with python multiprocessing not ending


Trying to get multiprocessing working correctly. Have review lots of posts in Stack overflow but none seem to fit my issue. I have a batch of pdfs that I am extracting the text data from. Using multiprocessing with queue to speed up the process. My script starts the processes and extracts the text from the pdfs.

There is a print statement in the main.py that is never executed saying that there are processes that never finish.

print(f'Finished in {round(finish - start, 2)} seconds(s)')

My main.py read as

import multiprocessing
from multiprocessing import freeze_support, Queue  # Add this line to support Windows platform
import ExtractPDFData as eD
import time
import os


if __name__ == '__main__':
    freeze_support()
    files_to_process = []
    pdf_directory = 'PDFData'

    for each_pdf_file in os.listdir(pdf_directory):
        if each_pdf_file.endswith('.pdf'):
            files_to_process.append(os.path.join(pdf_directory, each_pdf_file))

    start = time.perf_counter()
    q = Queue()
    current_processes_array = []

    for pdf_file in files_to_process:
        p = multiprocessing.Process(target=eD.pdf_process, args=(pdf_file, q))
        p.start()
        current_processes_array.append(p)

    print(f"Number of active children: {len(multiprocessing.active_children())}")

    for process in current_processes_array:
        process.join()

    finish = time.perf_counter()

    print(f'Finished in {round(finish - start, 2)} seconds(s)')

    with open('OutputData/halo_data.txt', 'w', encoding='UTF-8', errors='ignore') as add_text_to_file:
        while not q.empty():
            add_text_to_file.write(q.get())
        add_text_to_file.close()

    with open('OutputData/halo_data.txt', 'r', encoding='UTF-8', errors='ignore') as f:
        print(f'Number of char in the input file: {len(f.read())}')

The attaching ExtractedPDFData.py is

from pypdf import PdfReader
import os


# Extracts text from PDF file
def pdf_process(pdf_path, q):
    with open(pdf_path, 'rb') as f:
        reader = PdfReader(f)
        cleaned_pages_array = []

        for page in range(len(reader.pages)):
            # Extract the text from each page
            page_of_text = reader.pages[page].extract_text()
            # Convert to lowercase
            page_of_text = page_of_text.lower()
            # Remove non-alphanumeric characters and extra whitespaces
            cleaned_pages_array.append(page_of_text)

        print('Number of pages in the array: ', len(cleaned_pages_array))
        text = " ".join(cleaned_pages_array)
        q.put(text)

        print("Have written to file " + pdf_path)

        print(f"Worker process ID: {os.getpid()}, which comes from the parent {os.getppid()}")

Solution

  • The problem occurs because the queue has not been consumed before attempting join() on each of the sub-processes.

    In fact, using a queue in this case only serves to makes matters more complex than they need to be. One could simply return the relevant data from the subprocesses and collect that in the main program.

    However, OP want to use a queue so here's a safer way to do that.

    First of all, we can simplify this:

    from pypdf import PdfReader
    
    def pdf_process(path, q):
        reader = PdfReader(path)
        fix = [_p.extract_text().lower() for _p in reader.pages]
        q.put(' '.join(fix))
    

    ...then...

    from multiprocessing import Pool, Manager
    from ExtractPDFData import pdf_process
    from time import perf_counter
    from glob import glob
    
    NPROCS = 9 # tested on 10-core machine so leave one spare
    
    OUTPUT = '/Volumes/G-Drive/halo_data.txt'
    INPUT = '/Volumes/G-Drive/Downloads/*.pdf'
    
    if __name__ == '__main__':
        with Manager() as manager:
            start = perf_counter()
            q = manager.Queue()
            with Pool(NPROCS) as pool:
                with open(OUTPUT, 'w') as add_text_to_file:
                    args = [(_f, q) for _f in glob(INPUT)]
                    for _ in pool.starmap(pdf_process, args):
                        add_text_to_file.write(q.get())
    
            print(f'Finished in {perf_counter()-start:.2f}s')
    
            with open(OUTPUT) as f:
                print(f'Output file size = {len(f.read())} bytes')
    

    In testing, 20 PDFs each with an average of 20 pages, runs in <2s after producing an output file with >520,000 bytes