Search code examples
pythonmemorymultiprocessingpython-3.11

Performance and Memory duplication when using multiprocessing with a multiple arguments function in Python?


I am having difficulties understanding logic of optimizing multip-rocessing in Python 3.11.

Context:

I am running on Ubuntu 22.04, x86 12 cores / 32 threads, 128 GB memory

Concerns:

(Please refer to code and result below).
Both multiprocessing function using a local df (using map+partial or starmap) take a lot more time than multiprocessing using the global df.

It seemed ok to me but ...

... by activating the sleep(10) in both fun, I noticed that every spawned process takes around 1.2 GB of memory (using system monitor in Ubuntu), so I guess the df must have been duplicated even when declared global.

My question (eventually :-))

  • Why multi-processings using local df take so much time compared to the one using a global df if memory is copied in each process (whether it is local or global) ?
  • Or maybe system monitor in Ubuntu report the memory accessible to a process and not necessarily its intrinsic footprint ? (shared memory would then appear duplicated in several processes and the sum of all these processes could be greater than my system memory) ?

Thank you for your enlightenment

Code

from time import sleep
from multiprocessing import Pool
from functools import partial
from time import time

import numpy as np
import pandas as pd


def _fun_local(df, imax: int):
    # sleep(10)
    df_temp = df[:imax]
    return df_temp


def _fun_global(imax: int):
    global mydf
    # sleep(10)
    df_temp = mydf[:imax]
    return df_temp


def dummy():
    # Create a 1 GB df
    global mydf
    n_rows = 13_421_773
    n_cols = 10
    data = np.random.rand(n_rows, n_cols)
    mydf = pd.DataFrame(data, columns=[f'col_{i}' for i in range(n_cols)])
    # check memory footprint
    print('mydf', mydf.memory_usage(deep=True).sum() / 1024 / 1024)

    imaxs = range(1, 33)

    # With local df, call function with partial
    start = time()
    with Pool() as pool:
        fun = partial(_fun_local, mydf)
        results_partial = pool.map(fun, imaxs)
    print(f'local-partial took: {time()-start}')

    # With local df, call function with starmap
    start = time()
    with Pool() as pool:
        results_starmap = pool.starmap(_fun_local, [(mydf, imax) for imax in imaxs])
    print(f'local-starmap took: {time()-start}')

    # With global mydf
    start = time()
    with Pool() as pool:
        results_global = pool.map(_fun_global, imaxs)
    print(f'global took: {time()-start}')

    return results_local, results_global

if __name__ == '__main__':
    results = dummy()

Result:

mydf (MB):  1024.0001411437988
local-partial took: 89.05407881736755
local-starmap took: 88.06274890899658
global took: 0.09803605079650879

Solution

  • When you fork a child process (in this case you are forking multiprocessing.count() processes) it is true that the child process inherits the memory of the forking process. But copy-on-write semantics is used such that when that inherited memory is modified, it is first copied resulting in increased memory utilization. Even though the child process is not explicitly updating the global dataframe, once it is referenced it gets copied because Python is using reference counts for memory management and the reference count for the dataframe will get incremented. Now consider the following code.

    from multiprocessing import Pool
    import os
    import time
    
    SLEEP = False
    
    some_data = [0, 1, 2, 3, 4, 5, 6, 7]
    
    def worker(i):
        if SLEEP:
            time.sleep(.5)
    
        return some_data[i], os.getpid()
    
    def main():
        with Pool(8) as pool:
            results = pool.map(worker, range(8))
            print('results =', results)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    results = [(0, 35988), (1, 35988), (2, 35988), (3, 35988), (4, 35988), (5, 35988), (6, 35988), (7, 35988)]
    

    We see that it a single pool process (PID=35988) processed all 8 submitted tasks. This is because the task was incredibly short running resulting in a single pool process being able to pull from the pool's task queue all 8 tasks before the other pool processes finished starting and attempted to process tasks. This also means that global some_data was only referenced by a single pool process and therefore only copied once.

    If now we change SLEEP to True, the output is now:

    results = [(0, 45324), (1, 4828), (2, 19760), (3, 8420), (4, 41944), (5, 58220), (6, 46340), (7, 21628)]
    

    Each of the 8 pool processes processed one task and therefore some_data was copied 8 times.

    What Does This Mean?

    Your _fun_global worker function is also short running if it is not sleeping and probably only a single pool process is processing all submitted tasks resulting in the dataframe being copied only once. But if you do sleep, then each pool process will get to process a task and the dataframe will be copied N times where N is the size of the pool (os.cpu_count()).

    But even when the inherited memory must be copied, this is a relatively fast operation compared to the situation where you are passing a local dataframe as an argument, in which case the copying is done by using pickle to first serialize the dataframe in the main process and pickle again to de-serialize the dataframe in the child process.

    Summary

    1. Using a local dataframe is is slower because copying a dataframe using pickle is slower than just doing a byte-for-byte copy.
    2. Using a global dataframe will result in lower memory utilization if your worker function is short-running so that a single pool process handles all submitted tasks resulting in the copy-on-write occurring only once.