Search code examples
scalaapache-sparkdata-migration

how to use select() and map() in spark - scala?


Im writing a code for data migration from mysql to cassandra using spark. I m trying to generalize it so that given a conf file it can migrate any table. Here im stuck at 2 places:

  1. val dataframe2 = dataframe.select("a","b","c","d","e","f") After Loading the table from mysql i wish to select only a few columns, i have the names of these columns as a list. How can it be used here?
  2. val RDDtuple = dataframe2.map(r => (r.getAs(0), r.getAs(1), r.getAs(2), r.getAs(3), r.getAs(4), r.getAs(5))) Here again every table may have a different number of columns, so how can this be achieved?

Solution

    1. To use variable number of columns in select(), your list of columns can be converted like this:

    val columns = List("a", "b", "c", "d") val dfSelectedCols = dataFrame.select(columns.head, columns.tail :_*)

    Explanation: the first param in DataFrame's select(String, String...) is mandatory, so use columns.head. The remaining part of the list need to be converted to varargs using columns.tail :_*.

    1. It's not very clear from your example, but I suppose that x is a RDD[Row] and that you are trying to convert into a RDD of Tuples, right ? Please give more details and also use meaningful variable names. x, y or z are bad choices, especially if there is no explicit typing.