Search code examples
python-3.xmultiprocessingdaskdask-distributeddask-delayed

Dask distributed library giving serialization error


I have initialized the cluster with 10 workers and 4 thread per worker and I have 12 core laptop where I am running this.

    cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
    client = makeIndividualDashboard.Client()
    runOna(client)
    client.shutdown()

now below is the code where i am doing the cluster computing.

    st = settings.as_dict()
    new_settings = namedtuple("Settings", st.keys())(*st.values())
    to_process = []
    client.cluster.scale(10)
    if mongoConnection:
        mongo_c = True
    else:
        mongo_c = None
    future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
                             edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
                             descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
                             orgTeamTree, teamMembership, graphId, selectionRange, criteria,
                             onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
                             meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
    for node in nodes:
        if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
            to_process.append(dask.delayed(run_me)(node, *future))

    dask.compute(*to_process)

yes, this looks bit messy because run_me is a very big function as of now I can not modularize better in the future maybe I will. the issue is this work fine if I have only 5 workers or less but as soon I increase the number of workers this gives me serialization error.

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')

Again this is very weird because if i run this in the Linux server which has 35 core and I put numbers of workers 30 works fine not sure what is the issue. is this specific to my local ?? i can look for serialization issue but why this works with 5 workers only ??

Thanks in Advance for any help.


Solution

  • The error says that there is some object that you're trying to send to a worker that is not serializable. The type is a float64, which is maybe a numpy.float64 object? I don't really know given what you've said. I've verified that Dask moves around Numpy float64 objects just fine

    In [1]: from dask.distributed import Client                                                                                                                                                                                         
    
    In [2]: client = Client()                                                                                                                                                                                                           
    
    In [3]: import numpy as np                                                                                                                                                                                                          
    
    In [4]: x = np.float64(1)                                                                                                                                                                                                           
    
    In [5]: future = client.scatter(x)                                                                                                                                                                                                  
    
    In [6]: future.result()                                                                                                                                                                                                             
    Out[6]: 1.0
    

    I encourage you to provide an MCVE. See https://stackoverflow.com/help/minimal-reproducible-example