I am trying to vectorize the dask.dataframe with dask HashingVectorizer. I want the vectorization results to stay in the cluster (distributed system). That's why I am using client.persist
when I try to transform the data. But for some reason, I am getting the error below.
Traceback (most recent call last):
File "/home/dodzilla/my_project/components_with_adapter/vectorizers/base_vectorizer.py", line 112, in hybrid_feature_vectorizer
CLUSTERING_FEATURES=self.clustering_features)
File "/home/dodzilla/my_project/components_with_adapter/vectorizers/text_vectorizer.py", line 143, in vectorize
X = self.client.persist(fitted_vectorizer.transform, combined_data)
File "/home/dodzilla/.local/lib/python3.6/site-packages/distributed/client.py", line 2860, in persist
assert all(map(dask.is_dask_collection, collections))
AssertionError
I can't share the data but all of the necessary information about the data is as below:
>>>type(combined_data)
<class 'dask.dataframe.core.Series'>
>>>type(combined_data.compute())
<class 'pandas.core.series.Series'>
>>>combined_data.compute().shape
12
A minimal working example can be found below. Below, in the code snippet, combined_data
holds the merged columns. Meaning: all of the columns are merged into 1 column. Data has 12 rows. All of the values inside the rows are string. This is the code where I am getting the error:
from stop_words import get_stop_words
from dask_ml.feature_extraction.text import HashingVectorizer as daskHashingVectorizer
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import Client
def convert_dataframe_to_single_text(documents):
"""
Combine all of the columns into 1 column.
"""
if type(documents) is dask.dataframe.core.DataFrame:
cols = documents.columns
documents['combined'] = documents[cols].apply(func=(lambda row: ' '.join(row.values.astype(str))), axis=1,
meta=('str'))
document_texts = documents.drop(cols, axis=1)
else:
raise TypeError('Wrong type of data. Expected Pandas DF or Dask DF but received ', type(documents))
return document_texts
# Init the client.
client = Client('localhost:8786')
# Get stopwords
stopwords = get_stop_words(language="english")
# Create dask dataframe from pandas dataframe
data = {'Name':['Tom', 'nick', 'krish', 'jack'], 'Age':["twenty", "twentyone", "nineteen", "eighteen"]}
df = pd.DataFrame(data)
df = dd.from_pandas(df, npartitions=1)
# Init the vectorizer
vectorizer = daskHashingVectorizer(stop_words=stopwords, alternate_sign=False,
norm=None, binary=False,
n_features=10000)
# Combine all of to columns into 1 column.
combined_data = convert_dataframe_to_single_text(df)
# Fit the vectorizer.
fitted_vectorizer = client.persist(vectorizer.fit(combined_data))
# Transform the data.
X = client.persist(fitted_vectorizer.transform, combined_data)
I hope the information is enough.
Important note: I am not getting any kind of error when I say client.compute
but from what I understand this doesn't work in the cluster of machines and instead it runs in the local machine. And it returns a csr matrix instead of a lazily evaluated dask.array
.
This is not how I was supposed to use client.persist
. Functions I was looking for are client.submit
and client.map
... In my case client.submit
solved my issue.