I am new into Python and Rapids.AI and I am trying to recreate SKLearn KMeans in a multinode GPU (I have 2 GPUs) using Dask and RAPIDs (I am using rapids with its docker, which mounts a Jupyter Notebook too).
The code I show below (also I show an example of the Iris dataset) freezes and the jupyter notebook cell is never ended. I tried to use %debug
magic key and also Dask dashboard but I did not draw any clear conclusions (the only conclusion I think that could due to device_m_csv.iloc
, but I am not sure about it). Another thing that could be is I am forgetting some wait()
or compute()
or persistent()
(really, I am not sure on what occasions they should be used correctly).
I will explain the code, for a better reading:
chunksize = '2kb'
)X
) and labels ( (more known as y
) Sorry for not being able to offer more data, but I couldn't get it. Whatever is necessary to solve the doubt I will be happy to offer it.
Where or what can you think the problem is?.
Thank you very much in advance.
%%time
# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings
from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster
from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
# Configure matplotlib library
import matplotlib.pyplot as plt
%matplotlib inline
# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
%config InlineBackend.figure_format = 'svg'
# Configure warnings
#warnings.filterwarnings("ignore")
####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
# Identify number of workers
n_workers = len(client.has_what().keys())
# Read data in host memory
device_m_csv = dask_cudf.read_csv('./DataSet/iris.csv', header = 0, delimiter = ',', chunksize='2kB') # Get complete CSV. Chunksize is 2kb for getting 2 partitions
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = device_m_csv.iloc[:, [0, 1, 2, 3]] # Get data columns
device_m_labels = device_m_csv.iloc[:, 4] # Get labels column
# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');
# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type
# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
n_clusters = len(device_m_labels.unique()),
oversampling_factor = 40,
random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)
# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()
# Check score
#print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
#print('adjusted_rand_score: ', sk_adjusted_rand_score(device_m_labels, cu_m_kmeans.labels_))
#print('silhouette_score: ', sk_silhouette_score(device_m_data.to_pandas(), cu_m_kmeans_labels_predicted))
# Close local cluster
client.close()
cluster.close()
Iris dataset example:
@Corey, it is my ouput using your code:
NumPy Version: 1.17.5
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.1
cuPY Version: 6.7.0
cuDF Version: 0.12.0
cuML Version: 0.12.0
Dask Version: 2.10.1
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.12.0
MatPlotLib Version: 3.1.3
SeaBorn Version: 0.10.0
Cluster centers:
0 1 2 3
0 5.006000 3.428000 1.462000 0.246000
1 5.901613 2.748387 4.393548 1.433871
2 6.850000 3.073684 5.742105 2.071053
adjusted_rand_score: 0.7302382722834697
silhouette_score: 0.5528190123564102
I modified your reproducible example slightly and was able to produce an output on the most recent nightly of RAPIDS.
This is the output of the script.
(cuml_dev_2) cjnolet@deeplearn ~ $ python ~/kmeans_mnmg_reproduce.py
NumPy Version: 1.18.1
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.2.post1
cuPY Version: 7.2.0
cuDF Version: 0.13.0a+3237.g61e4d9c
cuML Version: 0.13.0a+891.g4f44f7f
Dask Version: 2.11.0+28.g10db6ba
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.13.0a+3237.g61e4d9c
MatPlotLib Version: 3.2.0
SeaBorn Version: 0.10.0
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/dask/array/random.py:27: FutureWarning: dask.array.random.doc_wraps is deprecated and will be removed in a future version
FutureWarning,
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning:
Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
warnings.warn("\n" + msg)
bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Cluster centers:
0 1 2 3
0 5.883607 2.740984 4.388525 1.434426
1 5.006000 3.428000 1.462000 0.246000
2 6.853846 3.076923 5.715385 2.053846
adjusted_rand_score: 0.7163421126838475
silhouette_score: 0.5511916046195927
And here is the modified script that produced this output:
# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings
from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster
from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
# Configure matplotlib library
import matplotlib.pyplot as plt
# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
# Configure warnings
#warnings.filterwarnings("ignore")
####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
# Identify number of workers
n_workers = len(client.has_what().keys())
# Read data in host memory
from sklearn.datasets import load_iris
loader = load_iris()
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.data)), npartitions=2) # Get data columns
device_m_labels = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.target)), npartitions=2)
# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');
# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type
# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
n_clusters = len(np.unique(loader.target)),
oversampling_factor = 40,
random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)
# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()
# Check score
print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
print('adjusted_rand_score: ', sk_adjusted_rand_score(loader.target, cu_m_kmeans_labels_predicted.values.get()))
print('silhouette_score: ', sk_silhouette_score(device_m_data.compute().to_pandas(), cu_m_kmeans_labels_predicted))
# Close local cluster
client.close()
cluster.close()
Can you please provide your output for the versions of these libraries? I would recommend also running the modified script and see if this runs successfully for you. If not, we can dive in further to find out if it's Docker-related, RAPIDS version related, or something else.
If you have access to the command-prompt that's running your Jupyter notebook, it might be helpful to enable logging by passing in verbose=True
when constructing the KMeans
object. This can help us isolate where things are getting stuck.