Search code examples
scalaspark-graphxpagerank

PageRank using GraphX


I have a .txt file say list.txt which consists of list of source and destination URL in the format

google.de/2011/10/Extract-host       link.de/2011/10/extact-host
facebook.de/2014/11/photos           facebook.de/2014/11/name.jpg
community.cloudera.com/t5/           community.cloudera.com/t10/
facebook.de/2014/11/photos           link.de/2011/10/extact-host

With the help of this post, How to create a VertexId in Apache Spark GraphX using a Long data type? I tried to create node and edges like :

val test = sc.textFile("list.txt")                                                                                       //running

val arrayForm = test.map(_.split("\t"))                                                                            // running

val nodes: RDD[(VertexId, Option[String])] = arrayForm.flatMap(array => array).
map((_.toLong None))                                                                                                  

val edges: RDD[Edge[String]] = arrayForm.
map(line => Edge(line(0), line(1), "")) 

The problem here is I don't really know how to create VertexId and similarly edge from string datatype. Please let me know how to resolve this.


Solution

  • The answer is hashing. Since your VertexIDs are strings you can hash them using MurmurHash3, make a graph, do what you want to do and then match the hash values with original strings.

    Example code

    package com.void
    
    import org.apache.spark._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.graphx.Graph
    import org.apache.spark.graphx.VertexId
    
    import scala.util.hashing.MurmurHash3
    
    object Main {
    
        def main( args: Array[ String ] ): Unit = {
    
            val conf = 
                new SparkConf()
                .setAppName( "SO Spark" )
                .setMaster( "local[*]" )
                .set( "spark.driver.host", "localhost" )
    
            val sc = new SparkContext( conf )
    
            val file = sc.textFile("data/pr_data.txt");
    
            val edgesRDD: RDD[(VertexId, VertexId)] = 
                file
                .map( line => line.split( "\t" ) )
                .map( line => (
                        MurmurHash3.stringHash( line( 0 ).toString ), MurmurHash3.stringHash( line( 1 ).toString )
                    )
                )
    
            val graph = Graph.fromEdgeTuples( edgesRDD, 1 )
    
            // graph.triplets.collect.foreach( println )
    
            // println( "####" )
    
            val ranks = 
                graph
                .pageRank( 0.0001 )
                .vertices
    
            ranks.foreach( println )
    
            println( "####" )
    
            val identificationMap = 
                file
                .flatMap( line => line.split( "\t" ) )
                .distinct
                .map( line => ( MurmurHash3.stringHash( line.toString ).toLong, line ) )
    
            identificationMap.foreach( println )
    
            println( "####" )
    
            val fullMap = 
                ranks
                .join( identificationMap )
    
            fullMap.foreach( println )
    
            sc.stop()
        }
    }
    

    Results

    (-1578471469,1.2982456140350878)
    (1547760250,0.7017543859649124)
    (1657711982,1.0000000000000002)
    (1797439709,0.7017543859649124)
    (996122257,0.7017543859649124)
    (-1127017098,1.5964912280701753)
    ####
    (1547760250,community.cloudera.com/t5/)
    (-1127017098,link.de/2011/10/extact-host)
    (1657711982,facebook.de/2014/11/name.jpg)
    (1797439709,facebook.de/2014/11/photos)
    (-1578471469,community.cloudera.com/t10/)
    (996122257,google.de/2011/10/Extract-host)
    ####
    (-1578471469,(1.2982456140350878,community.cloudera.com/t10/))
    (1797439709,(0.7017543859649124,facebook.de/2014/11/photos))
    (1547760250,(0.7017543859649124,community.cloudera.com/t5/))
    (996122257,(0.7017543859649124,google.de/2011/10/Extract-host))
    (1657711982,(1.0000000000000002,facebook.de/2014/11/name.jpg))
    (-1127017098,(1.5964912280701753,link.de/2011/10/extact-host))
    

    You can remove hashed IDs from the RDD by mapping them out but I believe that PageRank isn't your end goal so you'll probably need them later.