Search code examples
mongodbscalaapache-sparkapache-spark-sql

Spark Mongodb Connector Scala - Missing database name


I'm stuck with a weird issue. I'm trying to locally connect Spark to MongoDB using mongodb spark connector.

Apart from setting up spark I'm using the following code:

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()

movieRatings.show(100)

However, I get a compilation error:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.

On line where I set up readConfig. I don't get why it's complaining for not set uri when I clearly have a uri property in the Map. I might be missing something.


Solution

  • You can do it from SparkSession as mentioned here

    val spark = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnectorIntro")
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
        .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
        .getOrCreate()
    

    create dataframe using the config

    val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
    val df = MongoSpark.load(spark)
    

    Write df to mongodb

    MongoSpark.save(
    df.write
        .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
        .mode("overwrite"))
    

    In your code: prefixes are missing in config

    val readConfig = ReadConfig(Map(
        "spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", 
        "spark.mongodb.input.readPreference.name" -> "secondaryPreferred"), 
        Some(ReadConfig(sc)))
    
    val writeConfig = WriteConfig(Map(
        "spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))