I'm new to both spark and scala. I'm trying to upload a csv file to Mongo DB using a spark job in Scala.
On upload, facing the following error during the job execution,
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class .
Path to input file will be passed during the execution.
I'm kind of stuck with this issue for past 2 days. Any help to overcome this issue is appreciated.
Thanks.
I have tried it for uploading to elastic search and it worked like a charm.
import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config
object MongoUpload {
val host = <host>
val user = <user>
val pwd = <password>
val database = <db>
val collection = <collection>
val uri = "mongodb://${user}:${pwd}@${host}/"
val NOW = java.time.LocalDate.now.toString
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Mongo-Test-Upload")
.config("spark.mongodb.output.uri", uri)
.getOrCreate()
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.saveToMongoDB(
WriteConfig(
Map(
"uri" -> uri,
"database" -> database,
"collection" -> collection
)
)
)
}
def toEligibility(row: Row): Eligibility =
Eligibility(
row.getAs[String]("DATE_OF_BIRTH"),
row.getAs[String]("GENDER"),
row.getAs[String]("INDIVIDUAL_ID"),
row.getAs[String]("PRODUCT_NAME"),
row.getAs[String]("STATE_CODE"),
row.getAs[String]("ZIPCODE"),
NOW
)
}
case class Eligibility (
dateOfBirth: String,
gender: String,
recordId: String,
ProductIdentifier: String,
stateCode: String,
zipCode: String,
updateDate: String
)
Spark job fails with the following error, Caused by: org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class Eligibility
You can either map to a Document
of the desired format or convert to a Dataset
and then save it eg:
import spark.implicits._
spark
.read
.format("csv")
.option("header", "true")
.load(args(0))
.rdd
.map(toEligibility)
.toDS()
.write()
.format("com.mongodb.spark.sql.DefaultSource")
.options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
.save()
}