Search code examples
pysparkhdfsapache-spark-sqlapache-spark-mllib

how to traverse a massive dataframe pair-wisely and store the value in a n*n matrix?


Problem Description:

I have a dataset which is about 35 millons rows and 10 columns .

I want to calculate the distance between two rows, which the distancefunction like distance(row1,row2), and then store the value in a huge matrix.

The operations totally needed are nearly 6*10^15, which i think is very huge.

What I've tried :

  1. upload datafile to HDFS
  2. read data as dataframe
  3. df.collect() and get a array1 :array[Row]
  4. traverse array1 pair-wisely and calculate distance
  5. store the distance(rowi,rowj) in matrix(i,j)

Scala code :

val array1 = df.collect()
val l = array1.length 
for(i <-0 until array.length){
    for(j <-i+1 until array.length){
             val vd = Vectors.dense(i,j,distance(array(i),array(j)))

I want to save each value in Vector like above, and add it to RDD/Dataframe.

But the only way i've searched is by using union.I think it's not good enough.

So there are three questions need to be solved:

  1. collect is an action function, df.collect() will throw Exception java.lang.OutOf.MemoryError : Java heap space. Can this be avoided?
  2. As soon as i get a distance(rowi,rowj), i want to store it, how?
  3. Can I store the final matrix in HDFS and read it as a matrix in python?

ps: If above all can't be solved, which new idea can i use?

Any answer will help me a lot ,thank you!


Solution

  • Check https://spark.apache.org/docs/latest/mllib-data-types.html#indexedrowmatrix IndexedRowMatrix. An IndexedRowMatrix is similar to a RowMatrix but with meaningful row indices. You can design your algorithm based in this APi.