Search code examples
apache-sparkpysparkapache-spark-sqlapache-spark-mllibapache-spark-ml

How do I convert an RDD with a SparseVector Column to a DataFrame with a column as Vector


I have an RDD with a tuple of values (String, SparseVector) and I want to create a DataFrame using the RDD. To get a (label:string, features:vector) DataFrame which is the Schema required by most of the ml algorithm's libraries. I know it can be done because HashingTF ml Library outputs a vector when given a features column of a DataFrame.

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
        StructField("label", DoubleType(), False),
        StructField("tokens", ArrayType(StringType()), False)
    ]))

#assumming there is an RDD (double,array(strings))

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")

ndf = hashingTF.transform(temp_df)
ndf.printSchema()

#outputs 
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#|    |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)

So my question is, can I somehow having an RDD of (String, SparseVector) convert it to a DataFrame of (String, vector). I tried with the usual sqlContext.createDataFrame but there is no DataType that fits the needs I have.

df = sqlContext.createDataFrame(rdd,StructType([
        StructField("label" , StringType(),True),
        StructField("features" , ?Type(),True)
    ]))

Solution

  • You have to use VectorUDT here:

    # In Spark 1.x
    # from pyspark.mllib.linalg import SparseVector, VectorUDT
    from pyspark.ml.linalg import SparseVector, VectorUDT
    
    temp_rdd = sc.parallelize([
        (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
        (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
    
    schema = StructType([
        StructField("label", DoubleType(), True),
        StructField("features", VectorUDT(), True)
    ])
    
    temp_rdd.toDF(schema).printSchema()
    
    ## root
    ##  |-- label: double (nullable = true)
    ##  |-- features: vector (nullable = true)
    

    Just for completeness Scala equivalent:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{DoubleType, StructType}
    // In Spark 1x.
    // import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
    
    val schema = new StructType()
      .add("label", DoubleType)
       // In Spark 1.x
       //.add("features", new VectorUDT())
      .add("features",VectorType)
    
    val temp_rdd: RDD[Row]  = sc.parallelize(Seq(
      Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
      Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
    ))
    
    spark.createDataFrame(temp_rdd, schema).printSchema
    
    // root
    // |-- label: double (nullable = true)
    // |-- features: vector (nullable = true)