Search code examples
scalaapache-sparkapache-spark-sqlscala-breeze

Building a SparseVector out of values in a DataFrame


I've been trying to extract information out of a 1-column Spark Dataframe consisting of Doubles and put it into a Breeze SparseVector. To do this, I go through every element of my 1-column DataFrame, force it to be a Double, then add it to the VectorBuilder. My VectorBuilder correctly mutates its state in the foreach loop then all changes are cleared after the loop ends. Why does this happen? Is there a workaround?

EDIT 1:

I'm running this locally with 1 core; it's not on a cluster

Code:

val accum = sc.accumulator(0, "Counter") 

def correlate() : Unit = {

  val cols = df.columns
  val id = cols(0)       
  val id2 = cols(1)

  //id1 and id2 are there for 
  val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)")

  /* df1 is a dataframe that has 1 column*/   
  df1.show();
  accum.value_=(0);

  /******************** Problem starts here **********************/
  val builder = new VectorBuilder[Double](5)
  df1.foreach{ x =>
    x(0) match{             
      case d : Double => 
        builder.add(accum.value, d); 
        //This print statement prints out correct values
        println(s"index: ${accum.value} value: ${builder(accum.value)}")      
        accum.value += 1;
        println(s"builder's active size in loop: ${builder.activeSize}")  
      case _ => throw new ClassCastException("Pattern-Matching for Double failed");
    } 
  }
  //temp becomes empty at this point
  println(s"builder's active size out of loop: ${builder.activeSize}")

  val sparse = builder.toSparseVector     
  sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")}
}
this.correlate()

Output:

+-------+
|   RowX|
+-------+
|  145.0|
|   -1.0|
|-212.21|
|   23.3|
|   21.4|
+-------+

index: 0 value: 145.0
builder's active size in loop: 1
index: 1 value: -1.0
builder's active size in loop: 2
index: 2 value: -212.21
builder's active size in loop: 3
index: 3 value: 23.3
builder's active size in loop: 4
index: 4 value: 21.4
builder's active size in loop: 5

//the loop ends here and builder's state disappears

builder's active size out of loop: 0
index: 0 and value: 0.0
index: 1 and value: 0.0
index: 2 and value: 0.0
index: 3 and value: 0.0
index: 4 and value: 0.0

Solution

  • It adds to local copy of builder for every worker. To get local object collect:

    SparseVector(df1.rdd.map(_.getDouble(0)).collect)