I have initialized the cluster with 10 workers and 4 thread per worker and I have 12 core laptop where I am running this.
cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
client = makeIndividualDashboard.Client()
runOna(client)
client.shutdown()
now below is the code where i am doing the cluster computing.
st = settings.as_dict()
new_settings = namedtuple("Settings", st.keys())(*st.values())
to_process = []
client.cluster.scale(10)
if mongoConnection:
mongo_c = True
else:
mongo_c = None
future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
orgTeamTree, teamMembership, graphId, selectionRange, criteria,
onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
for node in nodes:
if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
to_process.append(dask.delayed(run_me)(node, *future))
dask.compute(*to_process)
yes, this looks bit messy because run_me is a very big function as of now I can not modularize better in the future maybe I will. the issue is this work fine if I have only 5 workers or less but as soon I increase the number of workers this gives me serialization error.
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
for key, value in data.items()
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
if type(value) is Serialize
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')
Again this is very weird because if i run this in the Linux server which has 35 core and I put numbers of workers 30 works fine not sure what is the issue. is this specific to my local ?? i can look for serialization issue but why this works with 5 workers only ??
Thanks in Advance for any help.
The error says that there is some object that you're trying to send to a worker that is not serializable. The type is a float64
, which is maybe a numpy.float64 object? I don't really know given what you've said. I've verified that Dask moves around Numpy float64 objects just fine
In [1]: from dask.distributed import Client
In [2]: client = Client()
In [3]: import numpy as np
In [4]: x = np.float64(1)
In [5]: future = client.scatter(x)
In [6]: future.result()
Out[6]: 1.0
I encourage you to provide an MCVE. See https://stackoverflow.com/help/minimal-reproducible-example