Search code examples
pythonpandasapache-sparkapache-spark-mllibapache-spark-ml

Create labeledPoints from Spark DataFrame in Python


What .map() function in python do I use to create a set of labeledPoints from a spark dataframe? What is the notation if The label/outcome is not the first column but I can refer to its column name, 'status'?

I create the Python dataframe with this .map() function:

def parsePoint(line):
    listmp = list(line.split('\t'))
    dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
    dataframe.insert(0, 'status', dataframe['accepted'])
    if 'NULL' in dataframe.columns:
        dataframe = dataframe.drop('NULL', axis=1)  
    if '' in dataframe.columns:
        dataframe = dataframe.drop('', axis=1)  
    if 'rejected' in dataframe.columns:
        dataframe = dataframe.drop('rejected', axis=1)  
    if 'accepted' in dataframe.columns:
        dataframe = dataframe.drop('accepted', axis=1)  
    return dataframe 

I convert it to a Spark dataframe after the reduce function has recombined all the Pandas dataframes.

parsedData=sqlContext.createDataFrame(parsedData)

But now how do I create labledPoints from this in Python? I assume it may be another .map() function?


Solution

  • If you already have numerical features and which require no additional transformations you can use VectorAssembler to combine columns containing independent variables:

    from pyspark.ml.feature import VectorAssembler
    
    assembler = VectorAssembler(
        inputCols=["your", "independent", "variables"],
        outputCol="features")
    
    transformed = assembler.transform(parsedData)
    

    Next you can simply map:

    from pyspark.mllib.regression import LabeledPoint
    from pyspark.sql.functions import col
    
    (transformed.select(col("outcome_column").alias("label"), col("features"))
      .rdd
      .map(lambda row: LabeledPoint(row.label, row.features)))
    

    As of Spark 2.0 ml and mllib API are no longer compatible and the latter one is going towards deprecation and removal. If you still need this you'll have to convert ml.Vectors to mllib.Vectors.

    from pyspark.mllib import linalg as mllib_linalg
    from pyspark.ml import linalg as ml_linalg
    
    def as_old(v):
        if isinstance(v, ml_linalg.SparseVector):
            return mllib_linalg.SparseVector(v.size, v.indices, v.values)
        if isinstance(v, ml_linalg.DenseVector):
            return mllib_linalg.DenseVector(v.values)
        raise ValueError("Unsupported type {0}".format(type(v)))
    

    and map:

    lambda row: LabeledPoint(row.label, as_old(row.features)))