I have a process that exports data from a mongodb using mongoexport
.
As the documentation mentions all json output is in Strict mode
This means data will look like this:
"{amount":{"$numberLong":"3"},"count":{"$numberLong":"245"}}
Where as my Scala case class is defined as:
case class MongoData(amount: Long, count: Long)
Reading the data will of course fail like this:
spark
.read
.json(inputPath)
.as[MongoData]
Is there a way to either export from mongo without the strict mode or to import the json in Scala without manually restructuring each field to the appropriate structure?
I'm now using this as solution. but it feels somewhat hacky.
case class DataFrameExtended(dataFrame: DataFrame) {
def undoMongoStrict(): DataFrame = {
val numberLongType = StructType(List(StructField("$numberLong", StringType, true)))
def restructure(fields: Array[StructField], nesting: List[String] = Nil): List[Column] = {
fields.flatMap(field => {
val fieldPath = nesting :+ field.name
val fieldPathStr = fieldPath.mkString(".")
field.dataType match {
case dt: StructType if dt == numberLongType =>
Some(col(s"$fieldPathStr.$$numberLong").cast(LongType).as(field.name))
case dt: StructType =>
Some(struct(restructure(dt.fields, fieldPath): _*).as(field.name))
case _ => Some(col(fieldPathStr).as(field.name))
// case dt:ArrayType => //@todo handle other DataTypes Array??
}
})
}.toList
dataFrame.select(restructure(dataFrame.schema.fields): _*)
}
}
implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}
spark
.read
.json(inputPath)
.undoMongoStrict()