Search code examples
stringapache-sparkpysparktype-conversionvectorization

Pyspark: How to convert a string (created from a dense vector) back to a dense vector?


I got a big dataset (about 10 million rows) and I'm looking for an efficient way to recreate dense vectors from strings.

This is my schema

root
 |-- features: string (nullable = true)
 |-- id_index: double (nullable = true)

And this is the first row

train.first() 


Row(features='[-1.8744251359864337,0.8208032878135908,1.6772737952383912,0.5074761601167237,-0.9327241948725055,1.064324833351145,-0.026543021475899584,-0.2738297628597614,1.1621882143427753,0.022718595764125882,-0.480804744856163,-0.058405708900107677,0.05971905240143063,-0.3469121380857816,-0.18753641543435115,-0.07209073425907712,0.3231645936694398,0.19913281255794962,-0.27914981007260486,-0.14564720252350738,0.20391682163361805,-0.32573666381677435,0.7576647591212007,0.4242633700261033,-0.15593357299211452,0.017449221887097507,0.05121680297513904,0.5842733444225926,0.10450917006313973,-0.24553120193983335,-0.5334612434119697,0.5517353774258191,-0.3116056252939926,-0.9396807558084017,0.12348781369817632,0.6166678815053761,0.05457562154488685,-0.13311701358504352,0.003852337914245302,-0.3513220177034468,0.23513621861470274,0.30291278930119236,-0.29289442414132855]', id_index=34823.0)

The feature column was created using the PCA, then to resample I had to convert them as a string and now I want to recreate the dense vector in order to work with spark.ml

any tips?

Thanks!


Solution

  • You can parse the string of array using from_json and then create dense vector using array_to_vector for Spark version >= 3.1.0

    import pyspark.sql.functions as F
    from pyspark.ml.functions import array_to_vector
    
    train = train.withColumn('features_vector', array_to_vector(F.from_json('features', "array<double>")))
    train.printSchema()
    
    # root
    #  |-- features: string (nullable = true)
    #  |-- id_index: double (nullable = true)
    #  |-- features_vector: vector (nullable = true)
    

    or use an UDF for Spark version < 3.1.0

    import pyspark.sql.functions as F
    from pyspark.ml.linalg import Vectors, VectorUDT
    
    arraytovector = F.udf(lambda vs: Vectors.dense(vs), VectorUDT())
    
    train = train.withColumn('features_vector', arraytovector(F.from_json('features', "array<double>")))
    train.printSchema()
    
    # root
    #  |-- features: string (nullable = true)
    #  |-- id_index: double (nullable = true)
    #  |-- features_vector: vector (nullable = true)