Search code examples
pythonmultiprocessingshared-memorypool

Combine Pool.map with shared memory Array in Python multiprocessing


I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.

I like the Pool.map function and would like to use it to calculate functions on that data in parallel.

I saw that one can use the Value or Array class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance when using the Pool.map function:

Here is a simplified example of what I am trying to do:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Can anyone tell me what I am doing wrong here?

So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.


Solution

  • Trying again as I just saw the bounty ;)

    Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:).

    from sys import stdin
    from multiprocessing import Pool, Array, Process
    
    def count_it( key ):
      count = 0
      for c in toShare:
        if c == key:
          count += 1
      return count
    
    if __name__ == '__main__':
      # allocate shared array - want lock=False in this case since we 
      # aren't writing to it and want to allow multiple processes to access
      # at the same time - I think with lock=True there would be little or 
      # no speedup
      maxLength = 50
      toShare = Array('c', maxLength, lock=False)
    
      # fork
      pool = Pool()
    
      # can set data after fork
      testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
      if len(testData) > maxLength:
          raise ValueError, "Shared array too small to hold data"
      toShare[:len(testData)] = testData
    
      print pool.map( count_it, ["a", "b", "s", "d"] )
    

    [EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:

    from sys import stdin
    from multiprocessing import Pool, Array, Process
    import mymodule
    
    def count_it( key ):
      count = 0
      for c in mymodule.toShare:
        if c == key:
          count += 1
      return count
    
    def initProcess(share):
      mymodule.toShare = share
    
    if __name__ == '__main__':
      # allocate shared array - want lock=False in this case since we 
      # aren't writing to it and want to allow multiple processes to access
      # at the same time - I think with lock=True there would be little or 
      # no speedup
      maxLength = 50
      toShare = Array('c', maxLength, lock=False)
    
      # fork
      pool = Pool(initializer=initProcess,initargs=(toShare,))
    
      # can set data after fork
      testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
      if len(testData) > maxLength:
          raise ValueError, "Shared array too small to hold data"
      toShare[:len(testData)] = testData
    
      print pool.map( count_it, ["a", "b", "s", "d"] )
    

    Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.