Search code examples
scalaapache-sparksimilaritytrigonometry

Cosine Similarity via DIMSUM in Spark


I have a very simple code to try Cosine Similarity:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry,   CoordinateMatrix, RowMatrix}

val rows= Array(((1,2,3,4,5),(1,2,3,4,5),(1,2,4,5,8),(3,4,1,2,7),(7,7,7,7,7)))
val mat = new RowMatrix(rows)

val simsPerfect = mat.columnSimilarities()
val simsEstimate = mat.columnSimilarities(0.8)

I run this code on Amazon AWS which has Spark 1.5 however I got the following message for the last two lines: "Erroe: value columnSimilarities is not a memeber of org.apache.spark.rdd.RDD[(int,int)]"

Could you please help to resolve this issue?


Solution

  • I found the answer. I need to convert the matrix to RDD. Here is the right code:

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix, RowMatrix}
    import org.apache.spark.rdd._
    import org.apache.spark.mllib.linalg._
    
    
    def matrixToRDD(m: Matrix): RDD[Vector] = {
    val columns = m.toArray.grouped(m.numRows)
    val rows = columns.toSeq.transpose // Skip this if you want a column-major RDD.
    val vectors = rows.map(row => new DenseVector(row.toArray))
    sc.parallelize(vectors)
    }
    
    val dm: Matrix = Matrices.dense(5, 5,Array(1,2,3,4,5,1,2,3,4,5,1,2,4,5,8,3,4,1,2,7,7,7,7,7,7))
    val rows = matrixToRDD(dm)
    val mat = new RowMatrix(rows)
    val simsPerfect = mat.columnSimilarities()
    val simsEstimate = mat.columnSimilarities(0.8)
    
    println("Pairwise similarities are: " + simsPerfect.entries.collect.mkString(", "))
    
    println("Estimated pairwise similarities are: " +     simsEstimate.entries.collect.mkString(", "))
    

    Cheers