I want to compute PageRank
from a CSV file of edges formatted as follows:
12,13,1.0
12,14,1.0
12,15,1.0
12,16,1.0
12,17,1.0
...
My code:
var filename = "<filename>.csv"
val graph = Graph.fromCsvReader[Long,Double,Double](
env = env,
pathEdges = filename,
readVertices = false,
hasEdgeValues = true,
vertexValueInitializer = new MapFunction[Long, Double] {
def map(id: Long): Double = 0.0 } )
val ranks = new PageRank[Long](0.85, 20).run(graph)
I get the following error from the Flink Scala Shell:
error: type mismatch;
found : org.apache.flink.graph.scala.Graph[Long,_23,_24] where type _24 >: Double with _22, type _23 >: Double with _21
required: org.apache.flink.graph.Graph[Long,Double,Double]
val ranks = new PageRank[Long](0.85, 20).run(graph)
^
What am I doing wrong?
( And are the initial values 0.0 for every vertex and 1.0 for every edge correct? )
The problem is that you're giving the Scala org.apache.flink.graph.scala.Graph
to PageRank.run
which expects the Java org.apache.flink.graph.Graph
.
In order to run a GraphAlgorithm
for a Scala Graph
object, you have to call the run
method of the Scala Graph
with the GraphAlgorithm
.
graph.run(new PageRank[Long](0.85, 20))
In the case of the PageRank
algorithm it is important to note that the algorithm expects an instance of type Graph[K, java.lang.Double, java.lang.Double]
. Since Java's Double
type is different from Scala's Double
type (in terms of type checking), this has to be accounted for.
For the example code this means
val graph = Graph.fromCsvReader[Long,java.lang.Double,java.lang.Double](
env = env,
pathEdges = filename,
readVertices = false,
hasEdgeValues = true,
vertexValueInitializer = new MapFunction[Long, java.lang.Double] {
def map(id: Long): java.lang.Double = 0.0 } )
.asInstanceOf[Graph[Long, java.lang.Double, java.lang.Double]]