Search code examples
mongodbapache-sparkapache-spark-sqlsubdocument

MongoSpark - convert bson Document to Map[String, Double]


In my MongoDB database I have a collection of the following documents: enter image description here

As one can see, each document has some nested documents (Decade, Title, Plot, Genres, etc.). Those are kinda Map representations of SparseVectors that I came up with. And are actually generated with my other Spark job.

As it appeared, these kinds of documents can't be easily read into the Spark DataFrame.

I was thinking how I can actually read such Documents into a Dataframe where each subdocument will be represented not by a bson Document but by a simple Map[String, Double]. Because each of those subdocuments is absolutely arbitrary and contains arbitrary number of numeric fields.

Is there a way to deal with such documents?


Solution

  • Managed to solve it. Here's how:

    import spark.implicits._
    final case class MovieData(imdbID: Int,
                           Title: Map[Int, Double],
                           Decade: Map[Int, Double],
                           Plot: Map[Int, Double],
                           Genres: Map[Int, Double],
                           Actors: Map[Int, Double],
                           Countries: Map[Int, Double],
                           Writers: Map[Int, Double],
                           Directors: Map[Int, Double],
                           Productions: Map[Int, Double]
                          )
    
    val movieDataDf = MongoSpark
      .load(sc, moviesDataConfig).rdd.map((doc: Document) => {
        MovieData(
          doc.get("imdbID").asInstanceOf[Int],
          documentToMap(doc.get("Title").asInstanceOf[Document]),
          documentToMap(doc.get("Decade").asInstanceOf[Document]),
          documentToMap(doc.get("Plot").asInstanceOf[Document]),
          documentToMap(doc.get("Genres").asInstanceOf[Document]),
          documentToMap(doc.get("Actors").asInstanceOf[Document]),
          documentToMap(doc.get("Countries").asInstanceOf[Document]),
          documentToMap(doc.get("Writers").asInstanceOf[Document]),
          documentToMap(doc.get("Directors").asInstanceOf[Document]),
          documentToMap(doc.get("Productions").asInstanceOf[Document])
        )
    }).toDF()
    
    def documentToMap(doc: Document): Map[Int, Double] = {
      doc.keySet().toArray.map(key => {
        (key.toString.toInt, doc.getDouble(key).toDouble)
      }).toMap
    }
    

    Hopefully the code is self-explanatory. Some type casting and conversions did the job. Probably, not the most elegant solution though.