In the below operation (adapted from the Dask DataFrame API docs), if I don't attach to a scheduler (leave the line assigning the client variable commented out), the operation completes successfully as expected.
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
connection_loc = 'foobar.net:8786'
# client = Client(connection_loc)
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()
The moment that same line is uncommented and a client connection is assigned, the following error occurs: TypeError: unorderable types: list() >= int()
(see full traceback for more).
Examining the traceback, I can see that the bytestring it is trying to deserialize is not what I would expect it should be trying to deserialize (see first line in full traceback distributed.protocol.pickle - INFO - Failed to deserialize
).
I've completely stopped and restarted the remote containers running both the worker and the scheduler to no avail. I've also used client.restart()
with no luck. Any idea why this other task is being passed to the worker and throwing this error? Any solution to get Dask to stop doing this?
Full traceback:
dask_worker_1 | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94."
dask_worker_1 | Traceback (most recent call last):
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1 | return pickle.loads(x)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1 | if cell_count >= 0 else
dask_worker_1 | TypeError: unorderable types: list() >= int()
dask_worker_1 | distributed.worker - WARNING - Could not deserialize task
dask_worker_1 | Traceback (most recent call last):
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task
dask_worker_1 | self.tasks[key] = _deserialize(function, args, kwargs, task)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize
dask_worker_1 | args = pickle.loads(args)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1 | return pickle.loads(x)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1 | if cell_count >= 0 else
dask_worker_1 | TypeError: unorderable types: list() >= int()
Dask: 0.15.0 Distributed: 1.17.1 OS: Ubuntu 16.04.2 LTS
I suspect that you have a mismatch in your cloudpickle versions between workers and clients. You'll have to ensure that all of your workers and clients have the same software setup. You can try the following command to help:
client.get_versions(check=True)
I don't think that this includes cloudpickle in dask.distributed version 1.17.1 but should in all subsequent versions. (it works now in master)