Search code examples
python-3.xpysparksparse-matrixapache-spark-mllib

How to pass SparseVectors to `mllib` in pyspark


I am using pyspark 1.6.3 through Zeppelin with python 3.5.

I am trying to implement Latent Dirichlet Allocation using the pyspark CountVectorizer and LDA functions. First, the problem: here is the code I am using. Let df be a spark dataframe with tokenized text in a column 'tokenized'

vectors = 'vectors'
cv = CountVectorizer(inputCol = 'tokenized', outputCol = vectors)
model = cv.fit(df)
df = model.transform(df)

corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
ldaModel = LDA.train(corpus, k=25)

This code is taken more or less from the pyspark api docs. On the call to LDA I get the following error:

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

The internet tells me that this is due to a type mismatch.

So lets look at the types for LDA and from CountVectorizer. From spark docs here is another example of a sparse vector going into LDA:

>>> from pyspark.mllib.linalg import Vectors, SparseVector
>>> data = [
...     [1, Vectors.dense([0.0, 1.0])],
...     [2, SparseVector(2, {0: 1.0})],
... ]
>>> rdd =  sc.parallelize(data)
>>> model = LDA.train(rdd, k=2, seed=1)

I implement this myself and this is what rdd looks like:

>> testrdd.take(2)

[[1, DenseVector([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})]]

On the other hand, if I go to my original code and look at corpus the rdd with the output of CountVectorizer, I see (edited to remove extraneous bits):

>> corpus.take(3)

[[0, Row(vectors=SparseVector(130593, {0: 30.0, 1: 13.0, ...
 [1, Row(vectors=SparseVector(130593, {0: 52.0, 1: 44.0, ...
 [2, Row(vectors=SparseVector(130593, {0: 14.0, 1: 6.0, ...
]

So the example I used (from the docs!) doesn't produce a tuple of (index, SparseVector), but a (index, Row(SparseVector))... or something?

Questions:

  • Is the Row wrapper around the SparseVector what is causing this error?
  • If so, how do I get rid of the Row object? Row is a property of a df, but I used df.rdd to convert to an rdd; what else would I need to do?

Solution

  • It maybe the problem. Just extract vectors from the Row object.

    corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]['vectors']]).cache()