I am having difficulties understanding logic of optimizing multip-rocessing in Python 3.11.
I am running on Ubuntu 22.04, x86 12 cores / 32 threads, 128 GB memory
(Please refer to code and result below).
Both multiprocessing function using a local df (using map+partial or starmap) take a lot more time than multiprocessing using the global df.
It seemed ok to me but ...
... by activating the sleep(10) in both fun, I noticed that every spawned process takes around 1.2 GB of memory (using system monitor
in Ubuntu), so I guess the df must have been duplicated even when declared global
.
My question (eventually :-))
Thank you for your enlightenment
from time import sleep
from multiprocessing import Pool
from functools import partial
from time import time
import numpy as np
import pandas as pd
def _fun_local(df, imax: int):
# sleep(10)
df_temp = df[:imax]
return df_temp
def _fun_global(imax: int):
global mydf
# sleep(10)
df_temp = mydf[:imax]
return df_temp
def dummy():
# Create a 1 GB df
global mydf
n_rows = 13_421_773
n_cols = 10
data = np.random.rand(n_rows, n_cols)
mydf = pd.DataFrame(data, columns=[f'col_{i}' for i in range(n_cols)])
# check memory footprint
print('mydf', mydf.memory_usage(deep=True).sum() / 1024 / 1024)
imaxs = range(1, 33)
# With local df, call function with partial
start = time()
with Pool() as pool:
fun = partial(_fun_local, mydf)
results_partial = pool.map(fun, imaxs)
print(f'local-partial took: {time()-start}')
# With local df, call function with starmap
start = time()
with Pool() as pool:
results_starmap = pool.starmap(_fun_local, [(mydf, imax) for imax in imaxs])
print(f'local-starmap took: {time()-start}')
# With global mydf
start = time()
with Pool() as pool:
results_global = pool.map(_fun_global, imaxs)
print(f'global took: {time()-start}')
return results_local, results_global
if __name__ == '__main__':
results = dummy()
mydf (MB): 1024.0001411437988
local-partial took: 89.05407881736755
local-starmap took: 88.06274890899658
global took: 0.09803605079650879
When you fork a child process (in this case you are forking multiprocessing.count()
processes) it is true that the child process inherits the memory of the forking process. But copy-on-write semantics is used such that when that inherited memory is modified, it is first copied resulting in increased memory utilization. Even though the child process is not explicitly updating the global dataframe, once it is referenced it gets copied because Python is using reference counts for memory management and the reference count for the dataframe will get incremented. Now consider the following code.
from multiprocessing import Pool
import os
import time
SLEEP = False
some_data = [0, 1, 2, 3, 4, 5, 6, 7]
def worker(i):
if SLEEP:
time.sleep(.5)
return some_data[i], os.getpid()
def main():
with Pool(8) as pool:
results = pool.map(worker, range(8))
print('results =', results)
if __name__ == '__main__':
main()
Prints:
results = [(0, 35988), (1, 35988), (2, 35988), (3, 35988), (4, 35988), (5, 35988), (6, 35988), (7, 35988)]
We see that it a single pool process (PID=35988) processed all 8 submitted tasks. This is because the task was incredibly short running resulting in a single pool process being able to pull from the pool's task queue all 8 tasks before the other pool processes finished starting and attempted to process tasks. This also means that global some_data
was only referenced by a single pool process and therefore only copied once.
If now we change SLEEP
to True
, the output is now:
results = [(0, 45324), (1, 4828), (2, 19760), (3, 8420), (4, 41944), (5, 58220), (6, 46340), (7, 21628)]
Each of the 8 pool processes processed one task and therefore some_data
was copied 8 times.
What Does This Mean?
Your _fun_global
worker function is also short running if it is not sleeping and probably only a single pool process is processing all submitted tasks resulting in the dataframe being copied only once. But if you do sleep, then each pool process will get to process a task and the dataframe will be copied N times where N is the size of the pool (os.cpu_count()
).
But even when the inherited memory must be copied, this is a relatively fast operation compared to the situation where you are passing a local dataframe as an argument, in which case the copying is done by using pickle
to first serialize the dataframe in the main process and pickle
again to de-serialize the dataframe in the child process.
Summary
pickle
is slower than just doing a byte-for-byte copy.