I am trying to use map_partitions
with a function that depends implicitly on a large object. The code looks something like this:
big_array = np.array(...)
def do_something(partition):
# some operations involving partition and big_array
dask_dataframe.map_partitions(do_something)
I then get the following error:
Traceback (most recent call last):
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/distributed/protocol/core.py", line 72, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/Users/wiebuschm/.pyenv/versions/3.8.5/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 196, in msgpack._cmsgpack.Packer._pack
ValueError: bytes object is too large
distributed.comm.utils - ERROR - bytes object is too large
Since big_array
somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way. But what's the upper limit? And how can I increase it?
Since big_array somehow needs to be shipped to all the workers I'm willing to believe that we encounter some large bytes objects along the way.
Here is your clue - don't make large function definitions like this. You should use scatter
to move your array to the workers, if you cannot have them load the array.
def do_something(partition, big_array):
# some operations involving partition and big_array
array_on_workers = client.scatter(big_array)
dask_dataframe.map_partitions(do_something, array_on_workers)