Search code examples
apache-sparkspark-streamingsinglestore

What is the correct way of using memSQL Connection object inside call method of Apache Spark code


I have a spark code where the code inside Call method makes call to the memSQL database for reading from a table. My code opens a new connection object each time and closes it after the task is done. This call is made from inside the Call method. This works fine but the execution time for Spark job becomes high. What would be a better way to do this so that the spark code execution time is reduced.

Thank You.


Solution

  • You can use one connection per partition, like this:

    rdd.foreachPartition {records =>
      val connection = DB.createConnection()
      //you can use your connection instance inside foreach
      records.foreach { r=>
        val externalData = connection.read(r.externaId)
        //do something with your data
      }
      DB.save(records)
      connection.close()
    }
    

    If you use Spark Streaming:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { records =>
        val connection = DB.createConnection()
        //you can use your connection instance inside foreach
        records.foreach { r=>
          val externalData = connection.read(r.externaId)
          //do something with your data
        }
        DB.save(records)
        connection.close()
      }
    }
    

    See http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams