Search code examples
pythonapache-sparkpysparkapache-spark-ml

Fit a dataframe into randomForest pyspark


I have a DataFrame that looks like this:

+--------------------+------------------+
|            features|           labels |
+--------------------+------------------+
|[-0.38475, 0.568...]|          label1  |
|[0.645734, 0.699...]|          label2  |
|     .....          |          ...     |
+--------------------+------------------+

Both columns are of String type (StringType()), I would like to fit this into spark ml randomForest. To do so, I need to convert the features columns into a vector containing floats. Does any one have any idea How to do so ?


Solution

  • If you are using Spark 2.x, I believe that this is what you need :

    from pyspark.sql.functions import udf
    from pyspark.mllib.linalg import Vectors
    from pyspark.ml.linalg import VectorUDT
    from pyspark.ml.feature import StringIndexer
    
    df = spark.createDataFrame([("[-0.38475, 0.568]", "label1"), ("[0.645734, 0.699]", "label2")], ("features", "label"))
    
    def parse(s):
      try:
        return Vectors.parse(s).asML()
      except:
        return None
    
    parse_ = udf(parse, VectorUDT())
    
    parsed = df.withColumn("features", parse_("features"))
    
    indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
    
    indexer.fit(parsed).transform(parsed).show()
    ## +----------------+------+-------------+
    ## |        features| label|label_indexed|
    ## +----------------+------+-------------+
    ## |[-0.38475,0.568]|label1|          0.0|
    ## |[0.645734,0.699]|label2|          1.0|
    ## +----------------+------+-------------+
    

    With Spark 1.6, it isn't much different :

    from pyspark.sql.functions import udf
    from pyspark.ml.feature import StringIndexer
    from pyspark.mllib.linalg import Vectors, VectorUDT
    
    df = sqlContext.createDataFrame([("[-0.38475, 0.568]", "label1"), ("[0.645734, 0.699]", "label2")], ("features", "label"))
    
    parse_ = udf(Vectors.parse, VectorUDT())
    
    parsed = df.withColumn("features", parse_("features"))
    
    indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
    
    indexer.fit(parsed).transform(parsed).show()
    ## +----------------+------+-------------+
    ## |        features| label|label_indexed|
    ## +----------------+------+-------------+
    ## |[-0.38475,0.568]|label1|          0.0|
    ## |[0.645734,0.699]|label2|          1.0|
    ## +----------------+------+-------------+
    

    Vectors has a parse function that can help you achieve what you are trying to do.