Search code examples
scalaapache-sparkapache-spark-sqlspark-graphx

Spark Graphx: Loading a graph from adjacency matrix


I have been experimenting with the Graphx APIs of Spark, primarily to learn and have a feel of how to use them. In the process, I have to load an adjacency matrix into a graph. The matrix dataset is here.

From the site, the matrix is described as


A number of employees in a factory was interviewed on a question: “Do you like to work with your co-worker?”. Possible answers are 1 for yes and 0 for no. Each employee gave an answer for each other employee thus creating an adjecancy matrix.


So, I have decided to name the employees as English alphabets ("A" onwards). Employees form the nodes of the graph, and their preferences for their co-workers form the edges. I haven't found any straightforward way in Spark to achieve this; my R-programmer friends tell me that it is quite easy to do so, in their world. So, I set upon writing a naive implementation to do so. Here's the code

val conf = new SparkConf().setMaster("local[*]").setAppName("GraphExploration App")
val spark = SparkSession
  .builder()
  .appName("Spark SQL: beginners exercise")
  .getOrCreate()     

    val sc = SparkContext.getOrCreate(conf)

      val df = spark.read.csv("./BlogInputs/sociogram-employees-un.csv").cache

      val allRows = df.toLocalIterator.toIndexedSeq

      type EmployeeVertex = (Long,String)

      val employeesWithNames = (0 until allRows.length).map(i => (i.toLong,((i + 'A').toChar.toString())))

      val columnNames   = (0 until allRows.length).map(i => ("_c" + i)).toIndexedSeq // It is a square matrix; rows == columns

      val edgesAsCollected = (for {
            rowIndex <- 0 until df.count.toInt
            colIndex <- 0 until df.count.toInt
            if (rowIndex != colIndex)
            } yield {

                    if (allRows(rowIndex).fieldIndex(columnNames(colIndex)) == 1)
                        Some(Edge(employeesWithNames(rowIndex)._1,employeesWithNames(colIndex)._1,"Likes"))
                    else
                       None

            }).flatten

       val employeeNodes = sc.parallelize(employeesWithNames)
       val edges = sc.parallelize(edgesAsCollected)

       val employeeGraph = Graph(sc.parallelize(employeesWithNames),edges,"Nobody")

Here is the schema:

scala>df.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)

.. and first few rows here

scala> df.show
16/12/21 07:12:00 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_8_0]
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  1|  0|  1|  1|  0|  1|  1|  1|  0|   0|   1|   0|   1|   1|   0|   1|   1|   0|   1|   0|   1|   0|   1|   1|
|  1|  0|  0|  1|  0|  0|  1|  0|  1|  0|   0|   1|   0|   0|   1|   0|   1|   0|   1|   0|   0|   1|   0|   1|   0|
|  0|  1|  0|  1|  1|  0|  0|  0|  1|  0|   0|   0|   0|   1|   1|   0|   0|   1|   0|   0|   0|   1|   1|   0|   1|
|  0|  1|  1|  0|  0|  0|  1|  0|  0|  0|   1|   1|   0|   1|   0|   0|   1|   1|   0|   0|   1|   0|   1|   1|   0|

This serves my purpose, but I feel there may be a different way. My very little knowledge of Spark's MLLib APIs is perhaps a barrier. Could someone please comment on this? Better even, could someone show me a better yet simple way (by editing my code, if necessary)?


Solution

  • I find @DanieldePaula's suggestion acceptable as an answer, for the case at hand:

    As the matrix is square, a very large number of rows would imply a very large number of columns, in which case using SparkSQL wouldn't seem optimal in my opinion. I think you can use Spark for this problem if the matrix is converted into a Sparse format, e.g. RDD[(row, col, value)], then it would be very easy to create your vertices and edges.

    Thanks, Daniel!