Search code examples
pythonpython-multiprocessingpathos

Safely write to file in parallel with pathos.multiprocessing


pathos.multiprocessing is known to have advantage over multiprocessing library in Python in the sense that the former uses dill instead of pickle and can serialize wider range of functions and other things.

But when it comes to writing pool.map() results to file line-wise using pathos, there comes up some trouble. If all processes in ProcessPool write results line-wise into a single file, they would interfere to each other writing some lines simultaneously and spoiling the job. In using ordinary multiprocessing package, I was able to make processes write to their own separate files, named with the current process id, like this:

example_data = range(100)
def process_point(point):
    output = "output-%d.gz" % mpp.current_process().pid
    with gzip.open(output, "a+") as fout:
        fout.write('%d\n' % point**2)

Then, this code works well:

import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)

But this code doesn't:

from pathos import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)

and throws AttributeError:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-10-a6fb174ec9a5> in <module>()
----> 1 pool.map(process_point, example_data)

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in map(self, func, iterable, chunksize)
    128         '''
    129         assert self._state == RUN
--> 130         return self.mapAsync(func, iterable, chunksize).get()
    131
    132     def imap(self, func, iterable, chunksize=1):

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in get(self, timeout)
    371             return self._value
    372         else:
--> 373             raise self._value
    374
    375     def _set(self, i, obj):

AttributeError: 'module' object has no attribute 'current_process'

There is no current_process() in pathos, and I cannot find anything similar to it. Any ideas?


Solution

  • I'm the pathos author. While your answer works for this case, it's probably better to use the fork of multiprocessing within pathos, found at the rather obtuse location: pathos.helpers.mp.

    This gives you a one-to-one mapping with multiprocessing, but with better serialization. Thus, you'd use pathos.helpers.mp.current_process.

    Sorry, it's both undocumented and not obvious… I should improve at least one of those two issues.