Search code examples
mongodbscalaapache-spark

Adding Mongo config to active spark session


I am trying to add the configuraions to an active spark session. Below is my code

val spark = SparkSession.getActiveSession.get
spark.conf.set("spark.mongodb.input.uri",
  "mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
spark.conf.set("spark.mongodb.input.partitioner" ,"MongoPaginateBySizePartitioner")
import com.mongodb.spark._

val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
println(customRdd.collect().foreach(println))

But I am getting an error:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property

While when I write the code

  val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
    // .config("spark.mongodb.output.uri", "mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
    .config("spark.mongodb.input.partitioner" ,"MongoPaginateBySizePartitioner")
    .getOrCreate()
  val sc = spark.sparkContext
  val customRdd = MongoSpark.load(sc)
  println(customRdd.count())
  println(customRdd.first.toJson)
  println(customRdd.collect().foreach(println))

My code is excecuting fine.

Kindly let me know what changes i need in the first code


Solution

  • You can define sparkSession like this with SparkConf. ( i don't know if this helps you )

      def sparkSession(conf: SparkConf): SparkSession = SparkSession
        .builder()
        .config(conf)
        .getOrCreate()
      val sparkConf = new SparkConf()
      sparkConf.set("prop","value")
      val ss = sparkSession(sparkConf)
    

    Or you can try to use SparkEnv ( i'm using sparkEnv for a lot of things to change props ):

      SparkEnv.get.conf.set("prop", "value")