Search code examples
daskdask-dataframedask-ml

How to efficiently cluster a dataframe column of numpy arrays


I started to look into Dask as I need to handle a large dataset that doesn't fit in RAM.

The data are saved in parquet-files as dataframes with a column "coordinates" that contains NumPy arrays of shape 2 -- i.e. the dtype of the column is "object", something like this:

>>> df = pd.DataFrame({"coordinates": [np.array([1,2]), np.array([3,4])]})
>>> df
  coordinates
0      [1, 2]
1      [3, 4]
>>> df.dtypes
coordinates    object

Now, I want to find clusters in these coordinates using dask_ml.cluster.KMeans.

It looks like this:

import dask.dataframe as dd
from dask_ml.cluster import KMeans

df = dd.read_parquet("*.parquet")
arr = df["coordinates"].to_dask_array(lengths=True)

kmeans = KMeans()
kmeans.fit(arr)

print(kmeans.cluster_centers_)

Sadly, the fit function can't handle the array:

ValueError: Expected 2D array, got 1D array instead

The problem seems to be that "arr" is an array of arrays identified as objects array([array([...], dtype=float32), ...], dtype=object) instead of a ndarray array([[...], ...], dtype=float32), so I have to convert df["coordinates"] to an array by stacking

arr = da.stack(df["descriptors"].to_dask_array(lengths=True))

But it seems that this tries to create arr by sequential operation in memory (i.e. using a single core) and is thus not feasible.

NB: I also tried decorating da.stack by delayed hoping this would allow Dask to delay and distribute the needed operations until kmeans.fit is called, but this throws an error also. Since I'm very new to Dask and its concepts, this might also not be what delayed is actually used for.

What would be the Dask-way to handle this problem? How can I transform my df["coordinates"] into an array that does not completely reside in memory but can be fit?


Solution

  • Up top, you should know that you shouldn't do this in pandas, not just dask. dask.dataframe is built on pandas, and pandas is built on numpy. Numpy can hold arrays of fixed-width strongly typed numerical data (e.g. float64, int16, etc) or objects. Fixed-type arrays occupy contiguous blocks of memory and are very fast to work with; on the other hand, object arrays necessitate using arrays of pointers to the object addresses and are much slower. So it is much better practice to use two columns of np.float data than a single object column, with each cell pointing to a numpy array holding two np.floats.

    Now, for dask.dataframe. Because dask depends on scheduling and allocating memory for partitions before the calculations reveal the results, it does not have even as much support object-type columns as pandas. You can read in a parquet file laid out as you have it, but things may break later on as you've discovered.

    Good news is you can easily map an operation to expand the column into multiple fixed-type columns using dask.dataframe.map_partitions:

    coord_df = df["coordinates"].map_partitions(
        lambda sub: pd.DataFrame(
            np.vstack(sub), columns=["x", "y"], index=sub.index
        ),
        meta=[("x", np.float64), ("y", np.float64)],
    )
    arr = coord_df.to_dask_array(lengths=True)