Search code examples
scalamatrixapache-sparkapache-spark-mllibscala-breeze

Sequentially updating columns of a Matrix RDD


I'm having philosophical issues with RDDs used in mllib.linalg. In numerical linear algebra one wants to use mutable data structure but since in Spark everything (RDDs) is immutable, I'd like to know if there's a way around this, specifically for the following situation I'm dealing with;

import org.apache.spark.mllib.linalg._
import breeze.numerics._

val theta = constants.Pi / 64
val N = 1000
val Gk: Matrix = Matrices.dense(2, 2, Array(
                               cos(theta), sin(theta),
                               -sin(theta), cos(theta))
                               )
val x0: Vector = Vectors.dense(0.0, 1.0)
var xk = DenseMatrix.zeros(2, N + 1)

Sequentially thinking, I'd like to access/update the first column of xk by x0, where normally in scala/breeze is done by xk(::, 0) := x0, and other columns by

for (k <- 0 to N - 1) {
    xk(::, k + 1) := Gk * xk(::, k)
}

but in mllib.linalg.Matrices there's no (apply like!) method defined for it here. Is just accessing a column (row) against immutability? what if I use RowMatrix? can I access/update rows then?

My matrices can be local (like above) or distributed and I'd like to know in general, if the above process can be done in a functional way.

I'd appreciate any comment or help.


Solution

  • So far, I have found answers for couple of my questions though the "philosophical" ones still remain.

    First, I figured out that I could use import breeze.linalg._ to exploit data mutability of breeze matrices as I did before, but that might not be completely done in a distributed way.

    Second, the functional approach to the above loop is of course, tail recursion which is as follows

    def nMultiply(x: Vector, M: Matrix, n: Int): Tuple2[Vector, Array[Double]] = {
        def loop(acc: Vector, n: Int, store: Array[Double]): Tuple2[Vector, Array[Double]] = {
            if (n <= 0) (acc, store)
            else {
                var res: Vector = M.multiply(loop(x, n - 1, store)._1)
                (res, loop(x, n - 1, store)._2.union(res.toArray))
            }
        }
    loop(x, n, Array[Double]())
    }