I am new to Spark and scala programming language. My input is a CSV file. I need to build an inverted index on the values in csv file like explained below with an example.
Input: file.csv
attr1, attr2, attr3
1, AAA, 23
2, BBB, 23
3, AAA, 27
output format: value -> (rowid, columnid) pairs
for example: AAA -> ((1,2),(3,2))
27 -> (3,3)
I have started with the following code. I am stuck after that. Kindly help.
object Main {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Invert Me!").setMaster("local[2]")
val sc = new SparkContext(conf)
val txtFilePath = "/home/person/Desktop/sample.csv"
val txtFile = sc.textFile(txtFilePath)
val nRows = txtFile.count()
val data = txtFile.map(line => line.split(",").map(elem => elem.trim()))
val nCols = data.collect()(0).length
}
}
Code preserving your style could look as
val header = sc.broadcast(data.first())
val cells = data.zipWithIndex().filter(_._2 > 0).flatMap { case (row, index) =>
row.zip(header.value).map { case (value, column) => value ->(column, index) }
}
val index: RDD[(String, Vector[(String, Long)])] =
cells.aggregateByKey(Vector.empty[(String, Long)])(_ :+ _, _ ++ _)
Here the index
value should contain desired mapping of CellValue
to pair (ColumnName, RowIndex)
Underscores in above methods are just shortcutted lambdas, it could be written another way as
val cellsVerbose = data.zipWithIndex().flatMap {
case (row, 1) => IndexedSeq.empty // skipping header row
case (row, index) => row.zip(header.value).map {
case (value, column) => value ->(column, index)
}
}
val indexVerbose: RDD[(String, Vector[(String, Long)])] =
cellsVerbose.aggregateByKey(zeroValue = Vector.empty[(String, Long)])(
seqOp = (keys, key) => keys :+ key,
combOp = (keysA, keysB) => keysA ++ keysB)