I have this following piece of code that does exactly what I want it to. However for files with file size of higher magnitude, it breaks and gives me an error:
ERROR: "raise EOFError"
def co(operation, in_queue, processed_lines):
while True:
item = in_queue.get()
line_number, line = item
if line is None:
return
line = line + operation + "changed"
processed_lines[line_number] = line
def _fo(file_name, operation):
manager = Manager()
results = manager.dict()
work = manager.Queue(10)
pool = []
for i in range(10):
p = Process(target=co, args=(operation, work, results))
p.start()
pool.append(p)
with open(file_name) as f:
num_lines = 0
iters = itertools.chain(f, (None,) * 10)
for num_and_line in enumerate(iters):
work.put(num_and_line)
num_lines += 1
for p in pool:
p.join()
return [results[idx] for idx in range(num_lines - 10)]
Now I am aware that my main process is closing before my multi process could write back to results however I am unable to resolve it. I am already gracefully closing my process using p.join(). I tried putting p.close() before p.join() but then it gives me error: "'Process' object has no attribute 'close'".
What is that I can do to resolve this here please ?
Error:
2020-10-01T15:55:22.488-05:00 item = in_queue.get()
2020-10-01T15:55:22.488-05:00 File "<string>", line 2, in get
2020-10-01T15:55:22.488-05:00 File "/opt/python3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod
2020-10-01T15:55:22.488-05:00 kind, result = conn.recv()
2020-10-01T15:55:22.488-05:00 File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv
2020-10-01T15:55:22.488-05:00 buf = self._recv_bytes()
2020-10-01T15:55:22.488-05:00 File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
2020-10-01T15:55:22.488-05:00 buf = self._recv(4)
2020-10-01T15:55:22.488-05:00 File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
2020-10-01T15:55:22.488-05:00 raise EOFError
2020-10-01T15:55:22.488-05:00 EOFError
Now I am aware that my main process is closing before my multi process could write back to results however I am unable to resolve it.
So this doesn't seem like it is a matter of the size of the input file, but rather something larger to process that is just taking more time and your willingness to terminate the main program early? Or for some reason you are choosing to exit your main process before processing has finished. It sounds like you need an extra form of proper cancellation to stop the workers before you move on and exit.
I am already gracefully closing my process using p.join().
Process.join()
is not really a graceful close of your main process. It just implies that the particular scope is blocking until your list of worker processes choose to terminate. If for any reason you kill your app with a KeyboardInterrupt
or tell your main thread to exit while this is running in another thread, your main will terminate and your child processes will hit an EOF when trying to read from the parent process for more work items.
The implementing of the main and the child processes is set up in a way where all of the workers will be sent None
values on the queue to signal for them to exit, which then unblocks all of the Process.join()
calls in the main. If you do not send a None
to each worker before that point and you exit your main, you can encounter the EOF problem because the workers have not been stopped.
I tried putting p.close() before p.join() but then it gives me error: "'Process' object has no attribute 'close'".
https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.close
"New in version 3.7."
That implies you are running a python version less than 3.7. But you could call terminate
or kill
instead, if your main process is going to exit early. It would be best to stop sending lines to your workers, and send the final None
values to get the workers to stop gracefully, and then use the join()
calls to wait on them.
with open(file_name) as f:
num_lines = 0
iters = itertools.chain(f, (None,) * 10)
for num_and_line in enumerate(iters):
work.put(num_and_line)
num_lines += 1
This block of code iterates on every line, sending it to the queue, and finally sends a None
value for each of the workers (10 in this case). If you decide you want to cancel the work then you need to stop sending lines and instead send 10 None
values, and break.
For anything more details than this, you would need to describe your cancellation circumstances.