Search code examples
pythonapache-sparkpysparkapache-spark-sqlapache-spark-ml

PySpark insert a constant SparseVector in a Dataframe column


I wish to insert in my dataframe tfIdfFr a column named "ref" with a constant whose the type is pyspark.ml.linalg.SparseVector.

When I try this

ref = tfidfTest.select("features").collect()[0].features # the reference
tfIdfFr.withColumn("ref", ref).select("ref", "features").show()

I get this error AssertionError: col should be Column

And when i try this:

from pyspark.sql.functions import lit
tfIdfFr.withColumn("ref", lit(ref)).select("ref", "features").show()

I get that error AttributeError: 'SparseVector' object has no attribute '_get_object_id'

Do you know a solution to insert a constant SparseVector in a Dataframe column?*


Solution

  • In this case you I'd just skip collect:

    ref = tfidfTest.select(col("features").alias("ref")).limit(1)
    tfIdfFr.crossJoin(ref)
    

    In general you can either use udf:

    from pyspark.ml.linalg import DenseVector, SparseVector, Vector, Vectors, \
     VectorUDT 
    from pyspark.sql.functions import udf
    
    def vector_lit(v): 
        assert isinstance(v, Vector) 
        return udf(lambda: v, VectorUDT())() 
    

    Usage:

    spark.range(1).select(
      vector_lit(Vectors.sparse(5, [1, 3], [-1, 1])
    ).alias("ref")).show()
    
    +--------------------+
    |                 ref|
    +--------------------+
    |(5,[1,3],[-1.0,1.0])|
    +--------------------+
    
    spark.range(1).select(vector_lit(Vectors.dense([1, 2, 3])).alias("ref")).show() 
    
    +-------------+
    |          ref|
    +-------------+
    |[1.0,2.0,3.0]|
    +-------------+
    

    It is also possible to use intermediate representation:

    import json
    from pyspark.sql.functions import from_json, lit
    from pyspark.sql.types import StructType, StructField
    
    def as_column(v):
        assert isinstance(v, Vector) 
        if isinstance(v, DenseVector):
            j = lit(json.dumps({"v": {
              "type": 1,
              "values": v.values.tolist()
            }}))
        else:
            j = lit(json.dumps({"v": {
              "type": 0,
              "size": v.size,
              "indices": v.indices.tolist(),
              "values": v.values.tolist()
            }}))
        return from_json(j, StructType([StructField("v", VectorUDT())]))["v"]
    

    Usage:

    spark.range(1).select(
        as_column(Vectors.sparse(5, [1, 3], [-1, 1])
     ).alias("ref")).show()  
    
    +--------------------+
    |                 ref|
    +--------------------+
    |(5,[1,3],[-1.0,1.0])|
    +--------------------+
    
    spark.range(1).select(as_column(Vectors.dense([1, 2, 3])).alias("ref")).show()
    
    +-------------+
    |          ref|
    +-------------+
    |[1.0,2.0,3.0]|
    +-------------+