Search code examples
pythonctypesdaskdask-delayed

Can I use dask.delayed on a function wrapped with ctypes?


The goal is to use dask.delayed to parallelize some 'embarrassingly parallel' sections of my code. The code involves calling a python function which wraps a c-function using ctypes. To understand the errors I was getting I wrote a very basic example.

The c-function:

double zippy_sum(double x, double y)
{
return x + y;
}

The python:

from dask.distributed import Client
client = Client(n_workers = 4)
client

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

result = dask.delayed(zippy)(1., 2.)
result.compute()

The Traceback:

--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)

~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item

KeyError: function zippy at 0x11ffc50d0

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:

ValueError: ctypes objects containing pointers cannot be pickled

Unfortunately, I still do not understand the errors! I am just getting started with dask and only have some basic experience with ctypes. Does anyone have suggestions for how to tackle this, or even understanding what need to be tackled?

Thanks!


Solution

  • Indeed, you cannot serialise a function referencing a C-function in the closure or the arguments. However, if your function is in a module which is accessible to all workers, then you end up serialising just the module name, and python does the right thing.

    module zippy.py (somewhere on your python PATH, perhaps the current directory for the example):

    import os
    import dask
    import ctypes
    
    current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
    _mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
    
    _zippy_sum = _mod.zippy_sum
    _zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
    _zippy_sum.restype = ctypes.c_double
    
    def zippy(x, y):
    
        z = _zippy_sum(x, y)
    
        return z
    

    main script:

    from dask.distributed import Client
    import zippy
    if __name__ == "__main__":
        # if running as a script, this is helpful
        client = Client(n_workers = 4)
    
    result = dask.delayed(zippy.zippy)(1., 2.)
    result.compute()
    

    The other solution, if you don't want to make a module, is to do all your C imports and definitions within the function.

    def zippy(x, y):
        _mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
    
        _zippy_sum = _mod.zippy_sum
        _zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
        _zippy_sum.restype = ctypes.c_double
    
        z = _zippy_sum(x, y)
    
        return z