Search code examples
scalaapache-sparksparse-matrixapache-spark-mllib

How to create a distributed sparse matrix in Spark from DataFrame in Scala


Question

Please help finding the ways to create a distributed matrix from the (user, feature, value) records in a DataFrame where features and their values are stored in a column.

Excerpts of the data is below but there are large number of users and features, and no all features are tested for users. Hence lots of feature values are null and to be imputed to 0.

For instance, a blood test may have sugar level, cholesterol level, etc as features. If those levels are not acceptable, then 1 is set as the value. But not all the features will be tested for the users (or patients).

+----+-------+-----+
|user|feature|value|
+----+-------+-----+
|  14|      0|    1|
|  14|    222|    1|
|  14|    200|    1|
|  22|      0|    1|
|  22|     32|    1|
|  22|    147|    1|
|  22|    279|    1|
|  22|    330|    1|
|  22|    363|    1|
|  22|    162|    1|
|  22|    811|    1|
|  22|    290|    1|
|  22|    335|    1|
|  22|    681|    1|
|  22|    786|    1|
|  22|    789|    1|
|  22|    842|    1|
|  22|    856|    1|
|  22|    881|    1|
+----+-------+-----+

If features are alredy columns, then there are ways explained.

But this is not the case. So one way could be pivoting the dataframe to apply those methods.

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  22|  1|  1|  1|  1|  0|  0|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

Then use row to vector conversion. I suppose using one of these:

  • VectorAssembler
  • org.apache.spark.mllib.linalg.Vectors.fromML
  • org.apache.spark.mllib.linalg.distributed.MatrixEntry

However, since there will be many null values to be imputed to 0, the pivoted dataframe will consume far more memory space. Also pivoting a large dataframe distributed among multiple nodes would be causing large shuffling.

Hence, seek for advices, ideas, suggestions.

Related

Environment

Spark 2.4.4


Solution

  • Solution

    1. Create a RDD[(user, feature)] for each input line.
    2. groupByKey to create a RDD[(user, [feature+])].
    3. Create a RDD[IndexedRow] where each IndexedRow represents below for all the features existing.
    +----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    |user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
    +----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    |  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    
    1. Convert the RDD[IndexedRow] into IndexedRowMatrix.

    For product operation, convert RowIndexedMatrix into BlockMatrix which supports product operation in distributed manner.

    Convert each original record into IndexedRow

    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.linalg.distributed._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.Row
    
    def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
        userToFeaturesMap match {
            case (userId, featureIDs) => {
                val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
                new IndexedRow (
                    userId,
                    Vectors.sparse(maxFeatureId + 1, featureCountKV)
                )
            }
        }
    }
    
    val userToFeatureCounters= featureData.rdd
        .map(rowPF => (rowPF.getInt(0), rowPF.getInt(1)))  // Out from ROW[(userId, featureId)]
        .groupByKey()                                      // (userId, Iterable(featureId))
        .map(
            userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
        )                                                 // IndexedRow(userId, Vector((featureId, 1)))
    

    Created IndexedRowMatrix

    val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
    

    Trasponsed IndexedRowMatrix via BlockMatrix as IndexedRowMatrix does not support transpose

    val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
        .transpose
    

    Created product with BlockMatrix as IndexedRowMatrix requires Local DenseMatrix on the right.

    val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
        .multiply(userFeatureBlockMatrixTransposed)
        .toIndexedRowMatrix