Search code examples
scalaapache-sparkmatrixmeanscala-breeze

How to find the mean of same cells in an array of Breeze Matrices in spark scala?


I have an Array[DenseMatrix[Double]] and i want to find the mean of the same cells. For example:

Array[0]: 
  +---+---+
  | 1 | 2 |
  +---+---+ 
  | 2 | 3 |
  +---+---+

Array[1]: 
  +---+---+
  | 1 | 1 |
  +---+---+ 
  | 3 | 1 |
  +---+---+

Array[2]:
  +---+---+
  | 2 | 3 |
  +---+---+ 
  | 4 | 1 |
  +---+---+

Result: DenseMatrix: 
  +----+----+
  | 1.3|  2 |
  +----+----+ 
  |  3 | 1.6|
  +----+----+

This is not a RDD as I want this code to run on the driver.

Spark Scala is new to me and all i can think is something like:

  val ar = rdd.collect().foreach(x=> {
    val matr = DenseMatrix.zeros[Double](C,2)
    matr := x/M
    matr
  })

But I don't know if it is correct, as it think it is a closure. Additionally, it expects a DenseMatrix[Double] return type but I get error, because if RDD is empty I don't have one. Any ideas?


Solution

  • When using breeze matrices you can use + for element-wise addition of two different matrices. That means the only thing you need to do is add all the matrices together and then divide by the number of matrices. It can be done as follows:

    import breeze.linalg.DenseMatrix
    
    val arr = Array(new DenseMatrix(2, 2, Array(1.0,2,2,3)), 
            new DenseMatrix(2, 2, Array(1.0,3,1,1)),
            new DenseMatrix(2, 2, Array(2.0,4,3,1)))
    
    val dm: DenseMatrix = arr.reduce(_ + _).map(_ / arr.length)
    

    The resulting matrix will have the mean of the same cells.


    This is also possible when using Spark and the ml.linalg.DenseMatrix matrix, however, it's a bit more complicated since there is no easy addition.

    val numCols = arr.head.numCols
    val numRows = arr.head.numRows
    val values = arr.map(_.values)
      .reduce((_, _).zipped.map(_ + _))
      .map(_ / arr.length)
    
    val dm = new DenseMatrix(numCols, numRows, values)