Search code examples
pythonclassmodulepython-multiprocessing

instance methods in function during Multiprocessing (python)


I tried to run multiprocessing with a large dataset.

when i run below script with for loop, the total run time is 1.5 sec.

def get_vars(accessCode, user_profile, wt, meals, instance_method='get_wt_adherence'):
   '''

   Examples
   --------
   >> n_cpus = multiprocessing.cpu_count()
   >> get_wt_adherence = partial(get_vars, user_profile, wt, meals, 
                                 instance_method='get_wt_adherence')
   >> pool = multiprocessing.Pool(n_cpus-5)
   >> result = pool.map(get_wt_adherence, accessCodes)
   >> concated_result = pd.concat(result)


   Version
   -------
   # 2020.03.26 Updated
       : Class name edited. 'NOOM' -> 'DATA_GEN'

   '''

   #
   COL_WEEK = ['{}week'.format(i) for i in range(1, 17)]


   data_gen = DATA_GEN(accessCode, user_profile, wt, meals)

   if instance_method == 'get_wt_adherence':
       func = data_gen.get_wt_adherence
   elif instance_method == 'get_meal_adherence':
       func = data_gen.get_meal_adherence
   elif instance_method == 'get_color_food':
       func = data_gen.get_color_food
   elif instance_method == 'get_daily_cal':
       func = data_gen.get_daily_cal

   row = pd.DataFrame([func(weeks) for weeks in range(1, 17)]).T
   row.columns = COL_WEEK
   row['accessCode'] = accessCode
   return row


from noom.handler import DATA_GEN
from functools import partial
import multiprocessing


# start_time = time.time()

get_wt = partial(get_vars, user_profile=user_profile, wt=wt_logs, meals=meals, instance_method='get_wt_adherence')

for i in range(10):
    get_wt(accessCodes[i])

however, when i tried to run this script usign multiprocessing, the script was not responded Even, 'accessCodes' is list which has 100 elements.

I suspect the 'get_wt' function using partial module.

n_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(n_cpus-15)
result_wt = pool.map(get_wt, accessCodes)         ; print('wt adherence finished')
pool.close()

How to solve this problem? the error is below

---------------------------------------------------------------------------
error                                     Traceback (most recent call last)
<ipython-input-22-73ddf2e21bbd> in <module>
      2 n_cpus = multiprocessing.cpu_count()
      3 pool = multiprocessing.Pool(n_cpus-15)
----> 4 result_wt = pool.map(get_wt_adherence, accessCodes[1:10])         ; print('wt adherence finished')
      5 pool.close()
      6 time.time() - start_time

/usr/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    264         in a list that is returned.
    265         '''
--> 266         return self._map_async(func, iterable, mapstar, chunksize).get()
    267 
    268     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

/usr/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    422                         break
    423                     try:
--> 424                         put(task)
    425                     except Exception as e:
    426                         job, idx = task[:2]

/usr/lib/python3.6/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/usr/lib/python3.6/multiprocessing/connection.py in _send_bytes(self, buf)
    391         n = len(buf)
    392         # For wire compatibility with 3.2 and lower
--> 393         header = struct.pack("!i", n)
    394         if n > 16384:
    395             # The payload is large so Nagle's algorithm won't be triggered

error: 'i' format requires -2147483648 <= number <= 2147483647

Solution

  • This is a bug on multiprocessing affecting Python 3.7 and below.

    The issue has been fixed in Python 3.8+