Search code examples
scalaapache-sparkdbscanrdd

Apache Spark distance between two points using squaredDistance


I have a RDD colletions of vectors, where each vector represent a point with x and y coordinates. For example, file is as follows:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1

I am reading it:

  def parseVector(line: String): Vector[Double] = {
    DenseVector(line.split(' ')).map(_.toDouble)
  }

  val lines = sc.textFile(inputFile)
  val points = lines.map(parseVector).cache()

Also, I have an epsilon:

  val eps = 2.0

For each point I want to find its neighbors who are within the epsilon distance. I do:

points.foreach(point =>
  // squaredDistance(point, ?) what should I write here?
)

How can I loop all points and for each point find its neighbors? Probably using map function?


Solution

  • You could do something like:

    val distanceBetweenPoints = points.cartesian(points)
        .filter{case (x,y) => (x!=y)} // remove the (x,x) diagonal
        .map{case (x,y) => ((x,y),distance(x,y))}
    val pointsWithinEps = distanceBetweenPoints.filter{case ((x,y),distance) => distance <= eps)}
    

    You could also combine the distance calculation within the filter if you don't care about the distance between the points afterwards.