Search code examples
scalaapache-sparkapache-spark-mllib

how to shuffle a sparse vector in spark using scala


I have a sparse vector in spark and I want to randomly shuffle (reorder) its contents. This vector is actually a tf-idf vector and what I want is to reorder it so that in my new dataset the features have different order. is there any way to do this using scala? this is my code for generating tf-idf vectors:

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val wordsData = tokenizer.transform(data).cache()
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .fit(wordsData)
val featurizedData = cvModel.transform(wordsData).cache()
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData).cache()

Solution

  • Perhaps this is useful-

    Load the test data

     val data = Array(
          Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
          Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
          Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
        )
        val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
        df.show(false)
        df.printSchema()
    
        /**
          * +---------------------+
          * |features             |
          * +---------------------+
          * |(5,[1,3],[1.0,7.0])  |
          * |[2.0,0.0,3.0,4.0,5.0]|
          * |[4.0,0.0,0.0,6.0,7.0]|
          * +---------------------+
          *
          * root
          * |-- features: vector (nullable = true)
          */
    

    shuffle the vector

    val shuffleVector = udf((vector: Vector) =>
         Vectors.dense(scala.util.Random.shuffle(mutable.WrappedArray.make[Double](vector.toArray)).toArray)
       )
    
        val p = df.withColumn("shuffled_vector", shuffleVector($"features"))
        p.show(false)
        p.printSchema()
    
        /**
          * +---------------------+---------------------+
          * |features             |shuffled_vector      |
          * +---------------------+---------------------+
          * |(5,[1,3],[1.0,7.0])  |[1.0,0.0,0.0,0.0,7.0]|
          * |[2.0,0.0,3.0,4.0,5.0]|[0.0,3.0,2.0,5.0,4.0]|
          * |[4.0,0.0,0.0,6.0,7.0]|[4.0,7.0,6.0,0.0,0.0]|
          * +---------------------+---------------------+
          *
          * root
          * |-- features: vector (nullable = true)
          * |-- shuffled_vector: vector (nullable = true)
          */
    

    You can also use the above udf to create Transformer and put it in pipeline

    please make sure to use import org.apache.spark.ml.linalg._

    Update-1 convert shuffled vector to sparse

     val shuffleVectorToSparse = udf((vector: Vector) =>
          Vectors.dense(scala.util.Random.shuffle(mutable.WrappedArray.make[Double](vector.toArray)).toArray).toSparse
        )
    
        val p1 = df.withColumn("shuffled_vector", shuffleVectorToSparse($"features"))
        p1.show(false)
        p1.printSchema()
    
        /**
          * +---------------------+-------------------------------+
          * |features             |shuffled_vector                |
          * +---------------------+-------------------------------+
          * |(5,[1,3],[1.0,7.0])  |(5,[0,3],[1.0,7.0])            |
          * |[2.0,0.0,3.0,4.0,5.0]|(5,[1,2,3,4],[5.0,3.0,2.0,4.0])|
          * |[4.0,0.0,0.0,6.0,7.0]|(5,[1,3,4],[7.0,4.0,6.0])      |
          * +---------------------+-------------------------------+
          *
          * root
          * |-- features: vector (nullable = true)
          * |-- shuffled_vector: vector (nullable = true)
          */