Search code examples
arraysscalaapache-sparktypesafe

spark scala typesafe config safe iterate over value of a specific column name


I have found similar post on Stackoverflow. However, I could not solve my issue So, this is why I write this post.

Aim

The aim is to perform a column projection [projection = filter columns] while loading a SQL table (I use SQL Server).

According to the scala cookbook this is the way to filter colums [using an Array]:

sqlContext.read.jdbc(url,"person",Array("gender='M'"),prop)

However, I do not want to hardcode Array("col1", "col2", ...) inside my Scala code this is why I am using a config file with typesafe (see hereunder).

Config file

dataset {
    type = sql
    sql{
        url = "jdbc://host:port:user:name:password"
        tablename = "ClientShampooBusinesLimited"
        driver = "driver"
        other = "i have a lot of other single string elements in the config file..."
        columnList = [
        {
            colname = "id"
            colAlias = "identifient"
        }
        {
            colname = "name"
            colAlias = "nom client"
        }
        {
            colname = "age"
            colAlias = "âge client"
        }
        ]
    }
}

Let's focus on 'columnList': The name of the SQL column correspond exatecly to 'colname'. 'colAlias' is a field that I will use later.

data.scala file

lazy val columnList = configFromFile.getList("dataset.sql.columnList")
lazy val dbUrl = configFromFile.getList("dataset.sql.url")
lazy val DbTableName= configFromFile.getList("dataset.sql.tablename")
lazy val DriverName= configFromFile.getList("dataset.sql.driver")

configFromFile is created by myself in another custom class. But this does not matter. The type of columnList is "ConfigList" this type comes from typesafe.

main file

def loadDataSQL(): DataFrame = {

val url = datasetConfig.dbUrl 
val dbTablename = datasetConfig.DbTableName
val dbDriver = datasetConfig.DriverName
val columns = // I need help to solve this


/* EDIT 2 march 2017
   This code should not be used. Have a look at the accepted answer.
*/
sparkSession.read.format("jdbc").options(
    Map("url" -> url,
    "dbtable" -> dbTablename,
    "predicates" -> columns,
    "driver" -> dbDriver))
    .load()
}

So all my issue is to extract the 'colnames' values in order to put them in a suitable array. Can someone help me to write the right operhand of 'val columns' ?

Thanks


Solution

  • If you're looking for a way to read the list of colname values into a Scala Array - I think this does it:

    import scala.collection.JavaConverters._
    
    val columnList = configFromFile.getConfigList("dataset.sql.columnList")
    val colNames: Array[String] = columnList.asScala.map(_.getString("colname")).toArray
    

    With the supplied file this would result in Array(id, name, age)

    EDIT: As to your actual goal, I actually don't know of any option named predication (nor can I find evidence for one in the sources, using Spark 2.0.2).

    JDBC Data Source performs "projection pushdown" based on the actual columns selected in the query used. In other words - only selected columns would be read from DB, so you can use the colNames array in a select immediately following the DF creation, e.g.:

    import org.apache.spark.sql.functions._
    
    sparkSession.read
      .format("jdbc")
      .options(Map("url" -> url, "dbtable" -> dbTablename, "driver" -> dbDriver))
      .load()
      .select(colNames.map(col): _*) // selecting only desired columns