Search code examples
apache-sparkapache-spark-sqlrddapache-spark-mllibapache-spark-dataset

How should I convert an RDD of org.apache.spark.ml.linalg.Vector to Dataset?


I'm struggling to understand how the conversion among RDDs, DataSets and DataFrames works. I'm pretty new to Spark, and I get stuck every time I need to pass from a data model to another (especially from RDDs to Datasets and Dataframes). Could anyone explain me the right way to do it?

As an example, now I have a RDD[org.apache.spark.ml.linalg.Vector] and I need to pass it to my machine learning algorithm, for example a KMeans (Spark DataSet MLlib). So, I need to convert it to Dataset with a single column named "features" which should contain Vector typed rows. How should I do this?


Solution

  • To convert a RDD to a dataframe, the easiest way is to use toDF() in Scala. To use this function, it is necessary to import implicits which is done using the SparkSession object. It can be done as follows:

    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._
    
    val df = rdd.toDF("features")
    

    toDF() takes an RDD of tuples. When the RDD is built up of common Scala objects they will be implicitly converted, i.e. there is no need to do anything, and when the RDD has multiple columns there is no need to do anything either, the RDD already contains a tuple. However, in this special case you need to first convert RDD[org.apache.spark.ml.linalg.Vector] to RDD[(org.apache.spark.ml.linalg.Vector)]. Therefore, it is necessary to do a convertion to tuple as follows:

    val df = rdd.map(Tuple1(_)).toDF("features")
    

    The above will convert the RDD to a dataframe with a single column called features.


    To convert to a dataset the easiest way is to use a case class. Make sure the case class is defined outside the Main object. First convert the RDD to a dataframe, then do the following:

    case class A(features: org.apache.spark.ml.linalg.Vector)
    
    val ds = df.as[A]
    

    To show all possible convertions, to access the underlying RDD from a dataframe or dataset can be done using .rdd:

    val rdd = df.rdd
    

    Instead of converting back and forth between RDDs and dataframes/datasets it's usually easier to do all the computations using the dataframe API. If there is no suitable function to do what you want, usually it's possible to define an UDF, user defined function. See for example here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html