Search code examples
mongodbscalaapache-sparkmahoutmahout-recommender

Run Mahout RowSimilarity recommender on MongoDB data


I have managed to run Mahout rowsimilarity on flat files of below format:

item-id tag1 tag-2 tag3

This has to be run via cli and the output is again flat files. I want to make this such that it reads data from MongoDB (open to using other DBs too) and then dumps the output to DB which can then be picked from our system.

I've researched for past few days and found below things:

  • Will have to write Scala code implementing RowSimilarity
  • Pass it an IndexedDataSet object to process the data
  • Convert the output to required format (json/csv)

What I'm yet to figure out is how do I go about importing data from DB to IndexedDataSet. Also, I've read about RDD format and still can't figure out how to convert json data to RDD which can be used by RowSimilarity code.

tl;dr: How to convert MongoDB data so that it can be processed by mahout/spark rowsimilarity?

Edit1: I have found some code that converts Mongodata to RDD from this link: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage#scala-example

Now I need help to convert it to IndexedDataset so that it can be passed to SimilarityAnalysis.rowSimilarityIDS.

tl;dr: How do I convert RDD to IndexedDataset


Solution

  • Below is the answer:

    import org.apache.hadoop.conf.Configuration
    import org.apache.mahout.math.cf.SimilarityAnalysis
    import org.apache.mahout.math.indexeddataset.Schema
    import org.apache.mahout.sparkbindings
    import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
    import org.apache.spark.rdd.RDD
    import org.bson.BSONObject
    import com.mongodb.hadoop.MongoInputFormat
    
    
    object SparkExample extends App {
      implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity")
      val mongoConfig = new Configuration()
      mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection")
    
      val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
        mongoConfig,
        classOf[MongoInputFormat],
        classOf[Object],
        classOf[BSONObject]
      )
    
      val documents_Array: RDD[(String, Array[String])] = documents.map(
        doc1 => (
          doc1._2.get("product_id").toString(),
          doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" "))
        )
      )
    
      val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x)
      val myIDs = IndexedDatasetSpark(new_doc)(mc)
    
      val readWriteSchema = new Schema(
        "rowKeyDelim" -> "\t",
        "columnIdStrengthDelim" -> ":",
        "omitScore" -> false,
        "elementDelim" -> " "
      )
      SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc)
    
    }
    

    build.sbt:

    name := "scala-mongo"
    version := "1.0"
    scalaVersion := "2.10.6"
    libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
    libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
    
    libraryDependencies ++= Seq(
      "org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
      "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
    )
    libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
    libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
    libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
    libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
    
    resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
    resolvers += Resolver.mavenLocal
    

    I've used mongo-hadoop to get data from Mongo and use it. Since my data had an array, I had to use flatMapValues to flatten it and then pass to IDS for proper output.

    PS: I posted the answer here and not the linked question because this Q&A covers the full scope of getting data and processing it.