I am using Apache Spark 2.1.2 and I want to use Latent Dirichlet allocation (LDA).
Previously I was using org.apache.spark.mllib
package and I could run this without any problems, but now after starting using spark.ml I am getting an error.
val lda = new LDA().setK(numTopics).setMaxIter(numIterations)
val docs = spark.createDataset(documents)
val ldaModel = lda.fit(docs)
As you may have noticed, I'm converting documents
RDD to a dataset object and am not sure if this is the correct way of doing this.
In this last line with .fit
I am getting the following error:
java.lang.IllegalArgumentException: Field "features" does not exist.
My docs
dataset looks like this:
scala> docs.take(2)
res28: Array[(Long, org.apache.spark.ml.linalg.Vector)] = Array((0,(7336,[1,2,4,5,12,13,19,24,26,42,48,49,57,59,63,73,81,89,99,106,113,114,141,151,157,160,177,181,198,261,266,267,272,297,307,314,315,359,383,385,410,416,422,468,471,527,564,629,717,744,763,837,890,928,932,951,961,1042,1134,1174,1305,1604,1653,1850,2119,2159,2418,2634,2836,3002,3132,3594,4103,4316,4852,5065,5107,5632,5945,6378,6597,6658],[1.0,1.0,1.0.......
My previous documents
before converting them to a dataset:
documents: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[2520]
How to get rid of the error above?
The main difference between spark mllib and spark ml is that spark ml operates on Dataframes (or Datasets) while mllib operates directly on RDDs of very defined structure.
You don't need to do much to make your code work with spark ml, but I'd still suggest to go through their documentation page and understand the differences, because you will come against more and more differences as you shift more and more towards spark ml. A good starting page with all the basics is here https://spark.apache.org/docs/2.1.0/ml-pipeline.html.
But to your code, all that is needed is just to give a correct column name to each column and it should be working just fine. Probably the easiest way to do so would be to utilise the implicit method toDF
on the underlying RDD:
import spark.implicits._
val lda = new LDA().setK(numTopics).setMaxIter(numIterations)
val docs = documents.toDF("label", "features")
val ldaModel = lda.fit(docs)