Search code examples
python-3.xpython-multiprocessing

EOF error in multiprocessing while using dictionary type?


I have this following piece of code that does exactly what I want it to. However for files with file size of higher magnitude, it breaks and gives me an error:

ERROR: "raise EOFError"

def co(operation, in_queue, processed_lines):
    while True:
        item = in_queue.get()
        line_number, line = item

        if line is None:
            return

        line = line + operation + "changed"
        processed_lines[line_number] = line

def _fo(file_name, operation):

    manager = Manager()
    results = manager.dict()
    work = manager.Queue(10)

    pool = []
    for i in range(10):
        p = Process(target=co, args=(operation, work, results))
        p.start()
        pool.append(p)

    with open(file_name) as f:
        num_lines = 0
        iters = itertools.chain(f, (None,) * 10)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)
                num_lines += 1

    for p in pool:
        p.join()

    return [results[idx] for idx in range(num_lines - 10)]

Now I am aware that my main process is closing before my multi process could write back to results however I am unable to resolve it. I am already gracefully closing my process using p.join(). I tried putting p.close() before p.join() but then it gives me error: "'Process' object has no attribute 'close'".

What is that I can do to resolve this here please ?

Error:

2020-10-01T15:55:22.488-05:00   item = in_queue.get()

2020-10-01T15:55:22.488-05:00   File "<string>", line 2, in get

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod

2020-10-01T15:55:22.488-05:00   kind, result = conn.recv()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv

2020-10-01T15:55:22.488-05:00   buf = self._recv_bytes()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes

2020-10-01T15:55:22.488-05:00   buf = self._recv(4)

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 383, in _recv

2020-10-01T15:55:22.488-05:00   raise EOFError

2020-10-01T15:55:22.488-05:00   EOFError

Solution

  • Now I am aware that my main process is closing before my multi process could write back to results however I am unable to resolve it.

    So this doesn't seem like it is a matter of the size of the input file, but rather something larger to process that is just taking more time and your willingness to terminate the main program early? Or for some reason you are choosing to exit your main process before processing has finished. It sounds like you need an extra form of proper cancellation to stop the workers before you move on and exit.

    I am already gracefully closing my process using p.join().

    Process.join() is not really a graceful close of your main process. It just implies that the particular scope is blocking until your list of worker processes choose to terminate. If for any reason you kill your app with a KeyboardInterrupt or tell your main thread to exit while this is running in another thread, your main will terminate and your child processes will hit an EOF when trying to read from the parent process for more work items.

    The implementing of the main and the child processes is set up in a way where all of the workers will be sent None values on the queue to signal for them to exit, which then unblocks all of the Process.join() calls in the main. If you do not send a None to each worker before that point and you exit your main, you can encounter the EOF problem because the workers have not been stopped.

    I tried putting p.close() before p.join() but then it gives me error: "'Process' object has no attribute 'close'".

    https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.close
    "New in version 3.7."

    That implies you are running a python version less than 3.7. But you could call terminate or kill instead, if your main process is going to exit early. It would be best to stop sending lines to your workers, and send the final None values to get the workers to stop gracefully, and then use the join() calls to wait on them.

    with open(file_name) as f:
        num_lines = 0
        iters = itertools.chain(f, (None,) * 10)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)
                num_lines += 1
    

    This block of code iterates on every line, sending it to the queue, and finally sends a None value for each of the workers (10 in this case). If you decide you want to cancel the work then you need to stop sending lines and instead send 10 None values, and break.

    For anything more details than this, you would need to describe your cancellation circumstances.