code doesn't return anything, it keeps running forever. Please help with the code snippet. FYI: I am using multiprocessing
for the first time.
I have low local memory, hence extracting data from a zip file. My idea is to read n lines at a time using islice
and process them using process_logBatch()
.
Running this code on windows machine - Jupyter Notebook.
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
def collect_results(result):
results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
results = []
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
while True:
print(f.closed)
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results)
There's a couple of problems. One is on Windows you need an if __name__ == '__main__':
statement to protect the main module as shown and discussed in the section titled "Safe importing of main module" in the multiprocssing module's documentation.
However, the second thing isn't so easily solved. Each process runs in its own memory space, so they don't all have the same results
list. To avoid that I switched to using Pool.map_async()
and collect the results when all the subprocesses have ended.
Here's a way I think it will work (based on your sample code):
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
import random # Added.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
if __name__ == '__main__':
# Not longer needed.
# def collect_results(result):
# results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
counter = 0 # Added to avoid NameError because undefined.
while True:
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
results = pool.map_async(process_logBatch, next_n_lines)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results.get())