I tried to run multiprocessing with a large dataset.
when i run below script with for loop, the total run time is 1.5 sec.
def get_vars(accessCode, user_profile, wt, meals, instance_method='get_wt_adherence'):
>> n_cpus = multiprocessing.cpu_count()
>> get_wt_adherence = partial(get_vars, user_profile, wt, meals,
>> pool = multiprocessing.Pool(n_cpus-5)
>> result = pool.map(get_wt_adherence, accessCodes)
>> concated_result = pd.concat(result)
# 2020.03.26 Updated
: Class name edited. 'NOOM' -> 'DATA_GEN'
COL_WEEK = ['{}week'.format(i) for i in range(1, 17)]
data_gen = DATA_GEN(accessCode, user_profile, wt, meals)
if instance_method == 'get_wt_adherence':
func = data_gen.get_wt_adherence
elif instance_method == 'get_meal_adherence':
func = data_gen.get_meal_adherence
elif instance_method == 'get_color_food':
func = data_gen.get_color_food
elif instance_method == 'get_daily_cal':
func = data_gen.get_daily_cal
row = pd.DataFrame([func(weeks) for weeks in range(1, 17)]).T
row.columns = COL_WEEK
row['accessCode'] = accessCode
return row
from noom.handler import DATA_GEN
from functools import partial
import multiprocessing
# start_time = time.time()
get_wt = partial(get_vars, user_profile=user_profile, wt=wt_logs, meals=meals, instance_method='get_wt_adherence')
for i in range(10):
however, when i tried to run this script usign multiprocessing, the script was not responded Even, 'accessCodes' is list which has 100 elements.
I suspect the 'get_wt' function using partial module.
n_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(n_cpus-15)
result_wt = pool.map(get_wt, accessCodes) ; print('wt adherence finished')
How to solve this problem? the error is below
error Traceback (most recent call last)
<ipython-input-22-73ddf2e21bbd> in <module>
2 n_cpus = multiprocessing.cpu_count()
3 pool = multiprocessing.Pool(n_cpus-15)
----> 4 result_wt = pool.map(get_wt_adherence, accessCodes[1:10]) ; print('wt adherence finished')
5 pool.close()
6 time.time() - start_time
/usr/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
264 in a list that is returned.
265 '''
--> 266 return self._map_async(func, iterable, mapstar, chunksize).get()
268 def starmap(self, func, iterable, chunksize=None):
/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
646 def _set(self, i, obj):
/usr/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
422 break
423 try:
--> 424 put(task)
425 except Exception as e:
426 job, idx = task[:2]
/usr/lib/python3.6/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
208 def recv_bytes(self, maxlength=None):
/usr/lib/python3.6/multiprocessing/connection.py in _send_bytes(self, buf)
391 n = len(buf)
392 # For wire compatibility with 3.2 and lower
--> 393 header = struct.pack("!i", n)
394 if n > 16384:
395 # The payload is large so Nagle's algorithm won't be triggered
error: 'i' format requires -2147483648 <= number <= 2147483647
This is a bug on multiprocessing affecting Python 3.7 and below.
The issue has been fixed in Python 3.8+