I am having difficulties understanding logic of optimizing multip-rocessing in Python 3.11.
I am running on Ubuntu 22.04, x86 12 cores / 32 threads, 128 GB memory
(Please refer to code and result below).
Both multiprocessing function using a local df (using map+partial or starmap) take a lot more time than multiprocessing using the global df.
It seemed ok to me but ...
... by activating the sleep(10) in both fun, I noticed that every spawned process takes around 1.2 GB of memory (using system monitor
in Ubuntu), so I guess the df must have been duplicated even when declared global
My question (eventually :-))
Thank you for your enlightenment
from time import sleep
from multiprocessing import Pool
from functools import partial
from time import time
import numpy as np
import pandas as pd
def _fun_local(df, imax: int):
# sleep(10)
df_temp = df[:imax]
return df_temp
def _fun_global(imax: int):
global mydf
# sleep(10)
df_temp = mydf[:imax]
return df_temp
def dummy():
# Create a 1 GB df
global mydf
n_rows = 13_421_773
n_cols = 10
data = np.random.rand(n_rows, n_cols)
mydf = pd.DataFrame(data, columns=[f'col_{i}' for i in range(n_cols)])
# check memory footprint
print('mydf', mydf.memory_usage(deep=True).sum() / 1024 / 1024)
imaxs = range(1, 33)
# With local df, call function with partial
start = time()
with Pool() as pool:
fun = partial(_fun_local, mydf)
results_partial = pool.map(fun, imaxs)
print(f'local-partial took: {time()-start}')
# With local df, call function with starmap
start = time()
with Pool() as pool:
results_starmap = pool.starmap(_fun_local, [(mydf, imax) for imax in imaxs])
print(f'local-starmap took: {time()-start}')
# With global mydf
start = time()
with Pool() as pool:
results_global = pool.map(_fun_global, imaxs)
print(f'global took: {time()-start}')
return results_local, results_global
if __name__ == '__main__':
results = dummy()
mydf (MB): 1024.0001411437988
local-partial took: 89.05407881736755
local-starmap took: 88.06274890899658
global took: 0.09803605079650879
When you fork a child process (in this case you are forking multiprocessing.count()
processes) it is true that the child process inherits the memory of the forking process. But copy-on-write semantics is used such that when that inherited memory is modified, it is first copied resulting in increased memory utilization. Even though the child process is not explicitly updating the global dataframe, once it is referenced it gets copied because Python is using reference counts for memory management and the reference count for the dataframe will get incremented. Now consider the following code.
from multiprocessing import Pool
import os
import time
SLEEP = False
some_data = [0, 1, 2, 3, 4, 5, 6, 7]
def worker(i):
return some_data[i], os.getpid()
def main():
with Pool(8) as pool:
results = pool.map(worker, range(8))
print('results =', results)
if __name__ == '__main__':
results = [(0, 35988), (1, 35988), (2, 35988), (3, 35988), (4, 35988), (5, 35988), (6, 35988), (7, 35988)]
We see that it a single pool process (PID=35988) processed all 8 submitted tasks. This is because the task was incredibly short running resulting in a single pool process being able to pull from the pool's task queue all 8 tasks before the other pool processes finished starting and attempted to process tasks. This also means that global some_data
was only referenced by a single pool process and therefore only copied once.
If now we change SLEEP
to True
, the output is now:
results = [(0, 45324), (1, 4828), (2, 19760), (3, 8420), (4, 41944), (5, 58220), (6, 46340), (7, 21628)]
Each of the 8 pool processes processed one task and therefore some_data
was copied 8 times.
What Does This Mean?
Your _fun_global
worker function is also short running if it is not sleeping and probably only a single pool process is processing all submitted tasks resulting in the dataframe being copied only once. But if you do sleep, then each pool process will get to process a task and the dataframe will be copied N times where N is the size of the pool (os.cpu_count()
But even when the inherited memory must be copied, this is a relatively fast operation compared to the situation where you are passing a local dataframe as an argument, in which case the copying is done by using pickle
to first serialize the dataframe in the main process and pickle
again to de-serialize the dataframe in the child process.
is slower than just doing a byte-for-byte copy.Use Shared Memory To Reduce Memory Utilization
You can reduce memory by sharing a single copy of the dataframe among the processes:
numpy array.data
array into a shared memory array shared_array
based on the shared array.from time import sleep
import multiprocessing
import numpy as np
import pandas as pd
def np_array_to_shared_array(np_array, lock=False):
"""Construct a sharable array from a numpy array.
Specify lock=True if multiple processes might be updating the same
array element."""
shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
buffer = shared_array.get_obj() if lock else shared_array
arr = np.frombuffer(buffer, np_array.dtype)
arr[:] = np_array.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array, shape, dtype):
"""Reconstruct a numpy array from a shared array."""
buffer = (
shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
else shared_array
return np.ndarray(shape, dtype=dtype, buffer=buffer)
def df_from_shared_array(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
"""Construct the dataframe based on a sharable array."""
np_array = shared_array_to_np_array(shared_array, shape, dtype)
return pd.DataFrame(np_array, columns=[f'col_{i}' for i in range(shape[1])])
def init_pool(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
"""This is executed for each pool process and for each creates a global variable
global mydf
mydf = df_from_shared_array(shared_array, shape, dtype)
def _fun(imax: int):
return mydf[:imax]
def make_data():
n_rows = 13_421_773
n_cols = 10
data = np.random.rand(n_rows, n_cols)
shared_array = np_array_to_shared_array(data)
mydf = df_from_shared_array(shared_array, data.shape, data.dtype)
shape, dtype = data.shape, data.dtype
del data # We no longer need this
return shared_array, shape, dtype, mydf
def dummy():
shared_array, shape, dtype, mydf = make_data()
imaxs = range(1, shape[1] + 1)
with multiprocessing.Pool(
initargs=(shared_array, shape, dtype)
) as pool:
results = pool.map(_fun, imaxs)
for result in results:
print(result, end='\n\n')
if __name__ == '__main__':