Search code examples
pythonperformancemultiprocessingpython-multiprocessingpool

Multiprocessing Pool much slower than manually instantiating multiple Processes


I'm reading a chunk from a big file, loading it in memory as a list of lines, then processing a task on every line.

The sequential solution was taking too long so I started looking at how to parallelize it.

The first solution I came up with is with Process and managing each subprocess' slice of the list.

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    for piece in read_in_chunks(file, CHUNKSIZE):
        jobs = []
        piece_list = piece.splitlines()
        piece_list_len = len(piece_list)
        item_delta = round(piece_list_len/N_PROCESSES)
        start = 0
        for process in range(N_PROCESSES):
            finish = start + item_delta
            p = mp.Process(target=work, args=(piece_list[start:finish]))
            start = finish
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

It completes each chunk in roughly 2498ms.

Then I discovered the Pool tool to automatically manage the slices.

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            pool.map(work, piece_list)

It completes each chunk in roughly 15540ms, 6 times slower than manual but still faster than sequential.

Am I using the Pool wrong? Is there a better or faster way to do this?

Thank you for reading.

Update

The Pool has quite the overhead as Hannu suggested.

The work function called by the Process method is expecting a list of lines.

The work function called by the Pool method is expecting a single line because of how the Pool is deciding the slices.

I'm not quite sure how to make the pool give a certain worker more than one line at a time.

That should solve the problem?

Update 2

Final question, is there a 3rd better way to do it?


Solution

  • Oh boy! This was quite a ride to figure out, but very fun nonetheless.

    The Pool.map is getting, pickling and passing every item individually from the iterator to each one of the workers. Once a worker is done, rinse and repeat, get -> pickle -> pass. This creates a noticeable overhead cost.

    This is actually intended because the Pool.map isn't smart enough to know the length of the iterator, nor is able to effectively make a list of lists and passing each list inside it (chunk) to a worker.

    But, it can be helped. Simply transforming the list to a list of chunks (lists) with a list comprehension works like a charm and reduces the overhead to the same level as the Process method.

    import multiprocessing as mp
    
    BIG_FILE_PATH = 'big_file.txt'
    CHUNKSIZE = '1000000'
    N_PROCESSES = mp.cpu_count()
    
    
    def read_in_chunks(file_object, chunk_size=1024):
        while True:
            data = file_object.read(chunk_size)
            if not data:
                break
            yield data
    
    
    with open(BIG_FILE_PATH, encoding="Latin-1") as file:
        with mp.Pool(N_PROCESSES) as pool:
            for piece in read_in_chunks(file, CHUNKSIZE):
                piece_list = piece.splitlines()
                piece_list_len = len(piece_list)
                item_delta = round(piece_list_len / N_PROCESSES)
                pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])
    

    This Pool with a list of lists iterator has the exact same running time of the Process method.