Search code examples
mongodbscalaapache-sparkcodec

Scala Spark job to upload CSV file data to mongo DB fails due to CodecConfigurationException


I'm new to both spark and scala. I'm trying to upload a csv file to Mongo DB using a spark job in Scala.

On upload, facing the following error during the job execution,

org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class .

Path to input file will be passed during the execution.

I'm kind of stuck with this issue for past 2 days. Any help to overcome this issue is appreciated.

Thanks.

I have tried it for uploading to elastic search and it worked like a charm.

import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config


object MongoUpload {
    val host = <host>
    val user = <user>
    val pwd = <password>
    val database = <db>
    val collection = <collection>
    val uri = "mongodb://${user}:${pwd}@${host}/"
    val NOW = java.time.LocalDate.now.toString

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Mongo-Test-Upload")
      .config("spark.mongodb.output.uri", uri)
      .getOrCreate()

    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .saveToMongoDB(
        WriteConfig(
            Map(
                "uri" -> uri,
                "database" -> database,
                "collection" -> collection
            )
        )
      )
   }


  def toEligibility(row: Row): Eligibility =
    Eligibility(
      row.getAs[String]("DATE_OF_BIRTH"),
      row.getAs[String]("GENDER"),
      row.getAs[String]("INDIVIDUAL_ID"),
      row.getAs[String]("PRODUCT_NAME"),
      row.getAs[String]("STATE_CODE"),
      row.getAs[String]("ZIPCODE"),
      NOW
    )
}

case class Eligibility (
  dateOfBirth: String,
  gender: String,
  recordId: String,
  ProductIdentifier: String,
  stateCode: String,
  zipCode: String,
  updateDate: String
)

Spark job fails with the following error, Caused by: org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class Eligibility


Solution

  • You can either map to a Document of the desired format or convert to a Dataset and then save it eg:

        import spark.implicits._
        spark
          .read
          .format("csv")
          .option("header", "true")
          .load(args(0))
          .rdd
          .map(toEligibility)
          .toDS()
          .write()
          .format("com.mongodb.spark.sql.DefaultSource")
          .options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
          .save()
       }