Search code examples

What can multiprocessing and dill do together?

I would like to use the multiprocessing library in Python. Sadly multiprocessing uses pickle which doesn't support functions with closures, lambdas, or functions in __main__. All three of these are important to me

In [1]: import pickle

In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>

Fortunately there is dill a more robust pickle. Apparently dill performs magic on import to make pickle work

In [3]: import dill

In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...

This is very encouraging, particularly because I don't have access to the multiprocessing source code. Sadly, I still can't get this very basic example to work

import multiprocessing as mp
import dill

p = mp.Pool(4)
print x: x**2, range(10))

Why is this? What am I missing? Exactly what are the limitations on the multiprocessing+dill combination?

Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python 
    Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python 
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/mrockli/Software/anaconda/lib/python2.7/", line 808, in __bootstrap_inner
  File "/home/mrockli/Software/anaconda/lib/python2.7/", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/", line 342, in _handle_tasks
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling
[INFO/PoolWorker-6] child process calling
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling in thread Thread-2:
Traceback (most recent call last):
  File "/home/mrockli/Software/anaconda/lib/python2.7/", line 808, in __bootstrap_inner
  File "/home/mrockli/Software/anaconda/lib/python2.7/", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/", line 342, in _handle_tasks
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling
[INFO/PoolWorker-6] child process calling
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling


  • multiprocessing makes some bad choices about pickling. Don't get me wrong, it makes some good choices that enable it to pickle certain types so they can be used in a pool's map function. However, since we have dill that can do the pickling, multiprocessing's own pickling becomes a bit limiting. Actually, if multiprocessing were to use pickle instead of cPickle... and also drop some of it's own pickling overrides, then dill could take over and give a much more full serialization for multiprocessing.

    Until that happens, there's a fork of multiprocessing called pathos (the release version is a bit stale, unfortunately) that removes the above limitations. Pathos also adds some nice features that multiprocessing doesn't have, like multi-args in the map function. Pathos is due for a release, after some mild updating -- mostly conversion to python 3.x.

    Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
    [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import dill
    >>> from pathos.multiprocessing import ProcessingPool    
    >>> pool = ProcessingPool(nodes=4)
    >>> result = x: x**2, range(10))
    >>> result
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    and just to show off a little of what pathos.multiprocessing can do...

    >>> def busy_add(x,y, delay=0.01):
    ...     for n in range(x):
    ...        x += n
    ...     for n in range(y):
    ...        y -= n
    ...     import time
    ...     time.sleep(delay)
    ...     return x + y
    >>> def busy_squared(x):
    ...     import time, random
    ...     time.sleep(2*random.random())
    ...     return x*x
    >>> def squared(x):
    ...     return x*x
    >>> def quad_factory(a=1, b=1, c=0):
    ...     def quad(x):
    ...         return a*x**2 + b*x + c
    ...     return quad
    >>> square_plus_one = quad_factory(2,0,1)
    >>> def test1(pool):
    ...     print pool
    ...     print "x: %s\n" % str(x)
    ...     print
    ...     start = time.time()
    ...     res =, x)
    ...     print "time to results:", time.time() - start
    ...     print "y: %s\n" % str(res)
    ...     print pool.imap.__name__
    ...     start = time.time()
    ...     res = pool.imap(squared, x)
    ...     print "time to queue:", time.time() - start
    ...     start = time.time()
    ...     res = list(res)
    ...     print "time to results:", time.time() - start
    ...     print "y: %s\n" % str(res)
    ...     print pool.amap.__name__
    ...     start = time.time()
    ...     res = pool.amap(squared, x)
    ...     print "time to queue:", time.time() - start
    ...     start = time.time()
    ...     res = res.get()
    ...     print "time to results:", time.time() - start
    ...     print "y: %s\n" % str(res)
    >>> def test2(pool, items=4, delay=0):
    ...     _x = range(-items/2,items/2,2)
    ...     _y = range(len(_x))
    ...     _d = [delay]*len(_x)
    ...     print map
    ...     res1 = map(busy_squared, _x)
    ...     res2 = map(busy_add, _x, _y, _d)
    ...     print
    ...     _res1 =, _x)
    ...     _res2 =, _x, _y, _d)
    ...     assert _res1 == res1
    ...     assert _res2 == res2
    ...     print pool.imap
    ...     _res1 = pool.imap(busy_squared, _x)
    ...     _res2 = pool.imap(busy_add, _x, _y, _d)
    ...     assert list(_res1) == res1
    ...     assert list(_res2) == res2
    ...     print pool.amap
    ...     _res1 = pool.amap(busy_squared, _x)
    ...     _res2 = pool.amap(busy_add, _x, _y, _d)
    ...     assert _res1.get() == res1
    ...     assert _res2.get() == res2
    ...     print ""
    >>> def test3(pool): # test against a function that should fail in pickle
    ...     print pool
    ...     print "x: %s\n" % str(x)
    ...     print
    ...     start = time.time()
    ...     res =, x)
    ...     print "time to results:", time.time() - start
    ...     print "y: %s\n" % str(res)
    >>> def test4(pool, maxtries, delay):
    ...     print pool
    ...     m = pool.amap(busy_add, x, x)
    ...     tries = 0
    ...     while not m.ready():
    ...         time.sleep(delay)
    ...         tries += 1
    ...         print "TRY: %s" % tries
    ...         if tries >= maxtries:
    ...             print "TIMEOUT"
    ...             break
    ...     print m.get()
    >>> import time
    >>> x = range(18)
    >>> delay = 0.01
    >>> items = 20
    >>> maxtries = 20
    >>> from pathos.multiprocessing import ProcessingPool as Pool
    >>> pool = Pool(nodes=4)
    >>> test1(pool)
    <pool ProcessingPool(ncpus=4)>
    x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
    time to results: 0.0553691387177
    y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
    time to queue: 7.91549682617e-05
    time to results: 0.102381229401
    y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
    time to queue: 7.08103179932e-05
    time to results: 0.0489699840546
    y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
    >>> test2(pool, items, delay)
    <built-in function map>
    <bound method of <pool ProcessingPool(ncpus=4)>>
    <bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
    <bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>
    >>> test3(pool)
    <pool ProcessingPool(ncpus=4)>
    x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
    time to results: 0.0523059368134
    y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]
    >>> test4(pool, maxtries, delay)
    <pool ProcessingPool(ncpus=4)>
    TRY: 1
    TRY: 2
    TRY: 3
    TRY: 4
    TRY: 5
    TRY: 6
    TRY: 7
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]