I'm reading a chunk from a big file, loading it in memory as a list of lines, then processing a task on every line.
The sequential solution was taking too long so I started looking at how to parallelize it.
The first solution I came up with is with Process and managing each subprocess' slice of the list.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()
It completes each chunk in roughly 2498ms.
Then I discovered the Pool tool to automatically manage the slices.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
It completes each chunk in roughly 15540ms, 6 times slower than manual but still faster than sequential.
Am I using the Pool wrong? Is there a better or faster way to do this?
Thank you for reading.
Update
The Pool has quite the overhead as Hannu suggested.
The work function called by the Process method is expecting a list of lines.
The work function called by the Pool method is expecting a single line because of how the Pool is deciding the slices.
I'm not quite sure how to make the pool give a certain worker more than one line at a time.
That should solve the problem?
Update 2
Final question, is there a 3rd better way to do it?
Oh boy! This was quite a ride to figure out, but very fun nonetheless.
The Pool.map is getting, pickling and passing every item individually from the iterator to each one of the workers. Once a worker is done, rinse and repeat, get -> pickle -> pass. This creates a noticeable overhead cost.
This is actually intended because the Pool.map isn't smart enough to know the length of the iterator, nor is able to effectively make a list of lists and passing each list inside it (chunk) to a worker.
But, it can be helped. Simply transforming the list to a list of chunks (lists) with a list comprehension works like a charm and reduces the overhead to the same level as the Process method.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len / N_PROCESSES)
pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])
This Pool with a list of lists iterator has the exact same running time of the Process method.