Search code examples
pythonmultiprocessingvectorizationpathos

Python multiprocessing/Pathos Process pickling error - Numpy vectorised function


I'm trying to parallelise my program by running the main bulk of the code in different processes and drawing the results together periodically. The format of my code is similar to the following example (which, unfortunately, works):

import abc
import numpy as np
from multiprocessing import Process
# from multiprocess.context import Process

class ProblemClassBase(metaclass=abc.ABCMeta):
  def __init__(self):
    self.problem_function_vectorised = np.vectorize(self.problem_function, otypes=[np.float64])

  @abc.abstractmethod
  def problem_function(self, arg):
    pass

  def use(self, arg):
    return self.problem_function_vectorised(arg)

class ProblemClass(ProblemClassBase):
  def __init__(self):
    super().__init__()

  def problem_function(self, arg):
    # Arbitrary example
    if arg > 2:
      return arg + 1
    else:
      return arg - 1

class NestingClass:
  def __init__(self, problem_object):
    self.po = problem_object

  def make_problem(self, arg):
    return self.po.use(arg)

class MainClass:
  def __init__(self):
    self.problem_obj = ProblemClass()
    self.nesting_obj = NestingClass(self.problem_obj)

  def run(self, arg):
    return self.nesting_obj.make_problem(arg)

  # Starting point for running the parallelisation
  @classmethod
  def run_multiproc(cls, arg):
    obj = cls()
    # Would somehow return this value
    print(obj.run(arg))

def run_parallel():
  # In reality I would start a number of processes
  proc = Process(target=MainClass.run_multiproc, args=(5,))
  proc.start()
  proc.join()

if __name__ == "__main__":
  run_parallel()

When I try to run my actual code, I get the error messages:

  • _pickle.PicklingError: Can't pickle <ufunc '_wind_dfn (vectorized)'>: attribute lookup _wind_dfn (vectorized) on main failed (for multiprocessing.Process)
  • _pickle.PicklingError: Can't pickle <ufunc '_wind_dfn (vectorized)'>: it's not found as main._wind_dfn (vectorized) (for multiprocess.context.Process)

Here '_wind_dfn' is the equivalent of 'problem_function' in the code above.

I've seen some answers which refer to the problems being caused by nesting of the code, and that rearranging things can help, but I'm not entirely sure how to fix it. Does anybody have any ideas on how I could fix this?


Solution

  • One possible solution.

    def __init__():
        self.problem_function_vectorized = None
    
    def use():
        if not self.problem_function_vectorised:
            self.problem_function_vectorised = .....
        ...
    

    The issue, as others have pointed out, is that you cannot pickle a numpy vectorized function. This has to be done on the thread.