Search code examples
scalaapache-sparkjoinrdd

How to join a random rdd to another rdd?


I have an RDD of strings (but could be anything really) that I would like to innerjoin with a rdd of random normals. I know this can be solved with a .zipWithIndex on both RDDs but this doesn't seem like it will scale well, is there a way to initialize a random rdd with data from another RDD or another method that would be faster? Here is what I've done with .zipWithIndex:

import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.rdd.RDD

val numExamples = 10 // number of rows in RDD 
val maNum   = 7
val commonStdDev   = 0.1 // common standard deviation 1/10, makes variance = 0.01
val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = maNum) 
val rescaledNormals = normalVectorRDD.map{myVec => myVec.toArray.map(x => x*commonStdDev)}
  .zipWithIndex
  .map{case (key,value) => (value,(key))} 

val otherRDD = sc.textFile(otherFilepath)
  .zipWithIndex
  .map{case (key,value) => (value,(key))} 

val joinedRDD = otherRDD.join(rescaledNormals).map{case(key,(other,dArray)) => (other,dArray)}

Solution

  • In general I wouldn't worry about zipWithIndex. While it requires additional actions it belongs to relatively cheap operations. join however is a different thing.

    Since vector content doesn't depend on the value from the otherRDD, it makes more sense to generate it in place. All you have to do is to mimic RandomRDDs logic:

    import org.apache.spark.mllib.random.StandardNormalGenerator 
    import org.apache.spark.ml.linalg.DenseVector  // or org.apache.spark.mllib
    
    val vectorSize = 42
    val stdDev = 0.1
    val seed = scala.util.Random.nextLong  // Or set manually
    
    // Define seeds for each partition
    val random = new scala.util.Random(seed)
    val seeds = (0 until otherRDD.getNumPartitions).map(
      i => i -> random.nextLong
    ).toMap
    
    otherRDD.mapPartitionsWithIndex((i, iter) => {
      val generator = new StandardNormalGenerator()
      generator.setSeed(seeds(i))
      iter.map(x => 
        (x, new DenseVector(Array.fill(vectorSize)(generator.nextValue() * stdDev)))
      )
    })