Search code examples
scalacsvapache-spark

build inverted index in spark application using scala


I am new to Spark and scala programming language. My input is a CSV file. I need to build an inverted index on the values in csv file like explained below with an example.

Input: file.csv

attr1, attr2, attr3
1,     AAA,    23
2,     BBB,    23
3,     AAA,    27

output format: value -> (rowid, columnid) pairs
for example: AAA -> ((1,2),(3,2))
             27  -> (3,3)

I have started with the following code. I am stuck after that. Kindly help.

object Main {
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Invert Me!").setMaster("local[2]")
    val sc = new SparkContext(conf)
    
    val txtFilePath = "/home/person/Desktop/sample.csv"
    
    val txtFile = sc.textFile(txtFilePath)
    val nRows = txtFile.count()
      
    val data = txtFile.map(line => line.split(",").map(elem => elem.trim()))
    val nCols = data.collect()(0).length
  }
}

Solution

  • Code preserving your style could look as

    val header = sc.broadcast(data.first())
    
    val cells = data.zipWithIndex().filter(_._2 > 0).flatMap { case (row, index) =>
      row.zip(header.value).map { case (value, column) => value ->(column, index) }
    }
    
    
    val index: RDD[(String, Vector[(String, Long)])] = 
       cells.aggregateByKey(Vector.empty[(String, Long)])(_ :+ _, _ ++ _)
    

    Here the index value should contain desired mapping of CellValue to pair (ColumnName, RowIndex)

    Underscores in above methods are just shortcutted lambdas, it could be written another way as

    val cellsVerbose = data.zipWithIndex().flatMap {
      case (row, 1) => IndexedSeq.empty // skipping header row
      case (row, index) => row.zip(header.value).map {
        case (value, column) => value ->(column, index)
      }
    } 
    
    
    val indexVerbose: RDD[(String, Vector[(String, Long)])] =
      cellsVerbose.aggregateByKey(zeroValue = Vector.empty[(String, Long)])(
        seqOp = (keys, key) => keys :+ key,
        combOp = (keysA, keysB) => keysA ++ keysB)