Search code examples
scalaapache-flinkgelly

Flink: PageRank type mismatch error


I want to compute PageRank from a CSV file of edges formatted as follows:

12,13,1.0
12,14,1.0
12,15,1.0
12,16,1.0
12,17,1.0
...

My code:

var filename = "<filename>.csv"

val graph = Graph.fromCsvReader[Long,Double,Double]( 
                   env = env, 
                   pathEdges = filename, 
                   readVertices = false, 
                   hasEdgeValues = true, 
                   vertexValueInitializer = new MapFunction[Long, Double] { 
                           def map(id: Long): Double = 0.0 } )

val ranks = new PageRank[Long](0.85, 20).run(graph)

I get the following error from the Flink Scala Shell:

error: type mismatch;
 found   : org.apache.flink.graph.scala.Graph[Long,_23,_24] where type _24 >: Double with _22, type _23 >: Double with _21
 required: org.apache.flink.graph.Graph[Long,Double,Double]
            val ranks = new PageRank[Long](0.85, 20).run(graph)
                                                         ^

What am I doing wrong?

( And are the initial values 0.0 for every vertex and 1.0 for every edge correct? )


Solution

  • The problem is that you're giving the Scala org.apache.flink.graph.scala.Graph to PageRank.run which expects the Java org.apache.flink.graph.Graph.

    In order to run a GraphAlgorithm for a Scala Graph object, you have to call the run method of the Scala Graph with the GraphAlgorithm.

    graph.run(new PageRank[Long](0.85, 20))
    

    Update

    In the case of the PageRank algorithm it is important to note that the algorithm expects an instance of type Graph[K, java.lang.Double, java.lang.Double]. Since Java's Double type is different from Scala's Double type (in terms of type checking), this has to be accounted for.

    For the example code this means

    val graph = Graph.fromCsvReader[Long,java.lang.Double,java.lang.Double]( 
      env = env, 
      pathEdges = filename, 
      readVertices = false, 
      hasEdgeValues = true, 
      vertexValueInitializer = new MapFunction[Long, java.lang.Double] { 
             def map(id: Long): java.lang.Double = 0.0 } )
      .asInstanceOf[Graph[Long, java.lang.Double, java.lang.Double]]