Search code examples
pythonapache-sparkapache-spark-sqlapache-spark-ml

Create labledpoints from Spark Dataframe & how to pass list of names to VectorAssembler


I have a further questions from here https://stackoverflow.com/a/32557330/5235052 I am trying to build labledPoints from a dataframe, where I have the features and label in columns. The features are all boolean with 1/0.

Here is a sample row from the dataframe:

|             0|       0|        0|            0|       0|            0|     1|        0|     0|           0|       0|       0|       0|           0|        0|         0|      0|            0|       0|           0|          0|         0|         0|              0|        0|        0|        0|         0|          0|    1|    0|    1|    0|    0|       0|           0|    0|     0|     0|     0|         0|         1|
#Using the code from above answer, 
#create a list of feature names from the column names of the dataframe
df_columns = []
for  c in df.columns:
    if c == 'is_item_return': continue
    df_columns.append(c)

#using VectorAssembler for transformation, am using only first 4 columns names
assembler = VectorAssembler()
assembler.setInputCols(df_columns[0:5])
assembler.setOutputCol('features')

transformed = assembler.transform(df)

   #mapping also from above link
   from pyspark.mllib.regression import LabeledPoint
   from pyspark.sql.functions import col

new_df = transformed.select(col('is_item_return'), col("features")).map(lambda row: LabeledPoint(row.is_item_return, row.features))

When I inspect the contents of the RDD, I get the right label, but the feature vector is wrong.

(0.0,(5,[],[]))

Could someone help me understanding, how to pass the column names of an existing dataframe as feature names to the VectorAssembler?


Solution

  • There is nothing wrong here. What you get is a string representation of the SparseVector which exactly reflects your input:

    • you take first five columns (assembler.setInputCols(df_columns[0:5])) and the output vector is of length 5
    • since first columns of example input don't contain non-zero entries indices and values arrays are empty

    To illustrate this lets use Scala which provides useful toSparse / toDense methods:

    import org.apache.spark.mllib.linalg.Vectors
    
    val v = Vectors.dense(Array(0.0, 0.0, 0.0, 0.0, 0.0))
    v.toSparse.toString
    // String = (5,[],[])
    
    v.toSparse.toDense.toString
    // String = [0.0,0.0,0.0,0.0,0.0]
    

    So with PySpark:

    from pyspark.ml.feature import VectorAssembler
    
    df = sc.parallelize([
        tuple([0.0] * 5),
        tuple([1.0] * 5), 
        (1.0, 0.0, 1.0, 0.0, 1.0),
        (0.0, 1.0, 0.0, 1.0, 0.0)
    ]).toDF()
    
    features = (VectorAssembler(inputCols=df.columns, outputCol="features")
        .transform(df)
        .select("features"))
    
    features.show(4, False)
    
    ## +---------------------+
    ## |features             |
    ## +---------------------+
    ## |(5,[],[])            |
    ## |[1.0,1.0,1.0,1.0,1.0]|
    ## |[1.0,0.0,1.0,0.0,1.0]|
    ## |(5,[1,3],[1.0,1.0])  |
    ## +---------------------+
    

    It also show that assembler is choosing different representation depending on number of non-zero entries.

    features.flatMap(lambda x: x).map(type).collect()
    
    ## [pyspark.mllib.linalg.SparseVector,
    ##  pyspark.mllib.linalg.DenseVector,
    ##  pyspark.mllib.linalg.DenseVector,
    ##  pyspark.mllib.linalg.SparseVector]