Search code examples
scalahadoopmatrixscalding

transforming from native matrix format, scalding


So this question is related to question Transforming matrix format, scalding

But now, I want to make the back operation. So i can make it in a such way:

Tsv(in, ('row, 'col, 'v))
  .read
  .groupBy('row) { _.sortBy('col).mkString('v, "\t") }
  .mapTo(('row, 'v) -> ('c)) { res : (Long, String) =>
    val (row, v) = res
    v }
  .write(Tsv(out))

But, there, we got problem with zeros. As we know, scalding skips zero values fields. So for example we got matrix:

1   0   8   
4   5   6   
0   8   9

In scalding format is is:

1   1   1
1   3   8
2   1   4
2   2   5
2   3   6
3   2   8
3   3   9

Using my function I wrote above we can only get:

1   8
4   5   6
8   9

And that's incorrect. So, how can i deal with it? I see two possible variants:

  1. To find way, to add zeros (actually, dunno how to insert data)
  2. To write own operations on own matrix format (it is unpreferable, cause I'm interested in Scalding matrix operations, and dont want to write all of them my own)

Mb there r some methods, and I can avoid skipping zeros in matrix?


Solution

  • Scalding stores a sparse representation of the data. If you want to output a dense matrix (first of all, that won't scale, because the rows will be bigger than can fit in memory at some point), you will need to enumerate all the rows and columns:

    // First, I highly suggest you use the TypedPipe api, as it is easier to get
    // big jobs right generally
    
    val mat = // has your matrix in 'row1, 'col1, 'val1
    def zero: V = // the zero of your value type 
    val rows = IterableSource(0 to 1000, 'row)
    val cols = IterableSource(0 to 2000, 'col)
    rows.crossWithTiny(cols)
      .leftJoinWithSmaller(('row, 'col) -> ('row1, 'col1), mat)
      .map('val1 -> 'val1) { v: V =>
        if(v == null) // this value should be 0 in your type:
          zero
        else
          v
      }
      .groupBy('row) { 
        _.toList[(Int, V)](('col, 'val1) -> 'cols)
      }
      .map('cols -> 'cols) { cols: List[(Int, V)] =>
        cols.sortBy(_._1).map(_._2).mkString("\t")
      }
      .write(TypedTsv[(Int, String)]("output"))