Search code examples
scalaapache-sparkdataframeresultset

Using Scala, create DataFrame or RDD from Java ResultSet


I don't want to create Dataframe or RDD directly using spark.read method. I want to form a dataframe or RDD from a java resultset (has 5,000,00 records). Appreciate if you provide a diligent solution.


Solution

  • First using RowFactory, we can create rows. Secondly, all the rows can be converted into Dataframe using SQLContext.createDataFrame method. Hope, this will help you too :).

    import java.sql.Connection
    import java.sql.ResultSet
    import org.apache.spark.sql.RowFactory
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StructType
    
    var resultSet: ResultSet = null
    val rowList = new scala.collection.mutable.MutableList[Row]
    var cRow: Row = null
    
    //Resultset is created from traditional Java JDBC.
    val resultSet = DbConnection.createStatement().execute("Sql")
    
    //Looping resultset
    while (resultSet.next()) {
       //adding two columns into a "Row" object
       cRow = RowFactory.create(resultSet.getObject(1), resultSet.getObject(2))
       //adding each rows into "List" object.
       rowList += (cRow)
    }
    
    val sconf = new SparkConf
    sconf.setAppName("")
    sconf.setMaster("local[*]")
    var sContext: SparkContext = new SparkContext(sConf)
    var sqlContext: SQLContext = new SQLContext(sContext)
    
    //creates a dataframe
    DF = sqlContext.createDataFrame(sContext.parallelize(rowList ,2), getSchema())
    DF.show() //show the dataframe.
    
    def getSchema(): StructType = {
        val DecimalType = DataTypes.createDecimalType(38, 10)
        val schema = StructType(
          StructField("COUNT", LongType, false) ::
            StructField("TABLE_NAME", StringType, false) :: Nil)
    
      //Returning the schema to define dataframe columns.
      schema
    }