Usual ML pipelines involve processing pandas or dask dataframes into a form that can be passed into ML models. Many dask-ml models, however, cannot accept Dask dataframes because they do not track the number of rows per partition. Calling the fit
method throws a Cannot fit on dask.dataframe due to unknown partition lengths error
. What should I do so that I can pass a Dask dataframes to a dask-ml model?
Here is an example:
import dask.dataframe as dd
import pandas as pd
from dask_ml.cluster import KMeans
df = dd.from_pandas(pd.DataFrame({'A': [1, 2, 3, 4, 5],
'B': [6, 7, 8, 9, 10]}),
npartitions=2)
kmeans = KMeans()
kmeans.fit(df)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-53-6c1545864b12> in <module>()
6
7 kmeans = KMeans()
----> 8 kmeans.fit(df)
~/anaconda3/envs/pds/lib/python3.6/site-packages/dask_ml/cluster/k_means.py in fit(self, X, y)
187
188 def fit(self, X, y=None):
--> 189 X = self._check_array(X)
190 labels, centroids, inertia, n_iter = k_means(
191 X,
~/anaconda3/envs/pds/lib/python3.6/site-packages/dask_ml/utils.py in wraps(*args, **kwargs)
298 def wraps(*args, **kwargs):
299 with _timer(f.__name__, _logger=logger, level=level):
--> 300 results = f(*args, **kwargs)
301 return results
302
~/anaconda3/envs/pds/lib/python3.6/site-packages/dask_ml/cluster/k_means.py in _check_array(self, X)
159 elif isinstance(X, dd.DataFrame):
160 raise TypeError(
--> 161 "Cannot fit on dask.dataframe due to unknown " "partition lengths."
162 )
163
TypeError: Cannot fit on dask.dataframe due to unknown partition lengths.
This is now supported on dask-ml master with https://github.com/dask/dask-ml/pull/393
That will be included in the Dask-ML 0.10 release.