Search code examples
apache-sparklinear-algebradistributed-computingapache-spark-mllibmatrix-multiplication

How to compute the dot product of two distributed RowMatrix in Apache Spark?


Let Q be a distributed Row Matrix in Spark, I want to calculate the cross product of Q with its transpose Q'.

However although a Row Matrix does have a multiply() method, but it can only accept local Matrices as an argument.

Code illustration ( Scala ):

val phi = new RowMatrix(phiRDD)            // phiRDD is an instance of RDD[Vector]
val phiTranspose = transposeRowMatrix(phi) // transposeRowMatrix()
                                           // returns the transpose of a RowMatrix
val crossMat = ?                           // phi * phiTranspose

Note that I want to perform the dot product of 2 Distributed RowMatrix not a distributed one with a local one.

One solution is to use an IndexedRowMatrix as following:

val phi = new IndexedRowMatrix(phiRDD)  // phiRDD is an instance of RDD[IndexedRow]
val phiTranspose = transposeMatrix(phi) // transposeMatrix()
                                        // returns the transpose of a Matrix
val crossMat = phi.toBlockMatrix().multiply( phiTranspose.toBlockMatrix()
                                             ).toIndexedRowMatrix()

However, I want to use the Row Matrix-Methods such as tallSkinnyQR() and this means that I sholud transform crossMat to a RowMatrix, using .toRowMatrix() method:

val crossRowMat = crossMat.toRowMatrix()

and finally I can apply

crossRowMat.tallSkinnyQR()

but this process includes many transformations between the types of the Distributed Matrices and according to what I understood from MLlib Programming Guide this is expensive:

It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.

Would someone elaborate, please.


Solution

  • Only distributed matrices which support matrix - matrix multiplication are BlockMatrices. You have to convert your data accordingly - artificial indices are good enough:

    new IndexedRowMatrix(
      rowMatrix.rows.zipWithIndex.map(x => IndexedRow(x._2,  x._1))
    ).toBlockMatrix match { case m => m.multiply(m.transpose) }