In my MongoDB database I have a collection of the following documents:
As one can see, each document has some nested documents (Decade, Title, Plot, Genres, etc.). Those are kinda Map representations of SparseVectors that I came up with. And are actually generated with my other Spark job.
As it appeared, these kinds of documents can't be easily read into the Spark DataFrame.
I was thinking how I can actually read such Documents into a Dataframe where each subdocument will be represented not by a bson Document but by a simple Map[String, Double]. Because each of those subdocuments is absolutely arbitrary and contains arbitrary number of numeric fields.
Is there a way to deal with such documents?
Managed to solve it. Here's how:
import spark.implicits._
final case class MovieData(imdbID: Int,
Title: Map[Int, Double],
Decade: Map[Int, Double],
Plot: Map[Int, Double],
Genres: Map[Int, Double],
Actors: Map[Int, Double],
Countries: Map[Int, Double],
Writers: Map[Int, Double],
Directors: Map[Int, Double],
Productions: Map[Int, Double]
)
val movieDataDf = MongoSpark
.load(sc, moviesDataConfig).rdd.map((doc: Document) => {
MovieData(
doc.get("imdbID").asInstanceOf[Int],
documentToMap(doc.get("Title").asInstanceOf[Document]),
documentToMap(doc.get("Decade").asInstanceOf[Document]),
documentToMap(doc.get("Plot").asInstanceOf[Document]),
documentToMap(doc.get("Genres").asInstanceOf[Document]),
documentToMap(doc.get("Actors").asInstanceOf[Document]),
documentToMap(doc.get("Countries").asInstanceOf[Document]),
documentToMap(doc.get("Writers").asInstanceOf[Document]),
documentToMap(doc.get("Directors").asInstanceOf[Document]),
documentToMap(doc.get("Productions").asInstanceOf[Document])
)
}).toDF()
def documentToMap(doc: Document): Map[Int, Double] = {
doc.keySet().toArray.map(key => {
(key.toString.toInt, doc.getDouble(key).toDouble)
}).toMap
}
Hopefully the code is self-explanatory. Some type casting and conversions did the job. Probably, not the most elegant solution though.