Search code examples
pythondaskdask-distributed

Dask map_partitions fails when function depends on large array


I am trying to use map_partitions with a function that depends implicitly on a large object. The code looks something like this:

big_array = np.array(...)

def do_something(partition):
    # some operations involving partition and big_array

dask_dataframe.map_partitions(do_something)

I then get the following error:

Traceback (most recent call last):
  File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/distributed/protocol/core.py", line 72, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 196, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
distributed.comm.utils - ERROR - bytes object is too large

Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way. But what's the upper limit? And how can I increase it?


Solution

  • Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way.

    Here is your clue - don't make large function definitions like this. You should use scatter to move your array to the workers, if you cannot have them load the array.

    def do_something(partition, big_array):
        # some operations involving partition and big_array
    
    array_on_workers = client.scatter(big_array)
    dask_dataframe.map_partitions(do_something, array_on_workers)