Search code examples
scalaapache-sparkspark-graphx

How to print one val to PartitionBy


I have one problem in Apache Spark GraphX, i tried to partition one graph with this method in the main:

graph.partitionBy(HDRF, 128)

HDRF is a method to do partitioning, I would like to print a val that is inside it, I tried to print but it does not print anything

/EDIT/

package app

import org.apache.spark.graphx._
import org.apache.spark._
import org.apache.spark.rdd.RDD


/**
  * Main del sistema
  */

object Main{


  def main(args: Array[String]) {

    val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("HDRF"))

    // mostra solo i log in caso di errore
    sc.setLogLevel("ERROR")

    //modifico il file di testo preso in ingresso
    val edges:RDD[Edge[String]]=
      sc.textFile("data/u1.base").map{ line =>
        val fields= line.split("\t")
        Edge(fields(0).toLong,fields(1).toLong,fields(2))
      }

    val graph: Graph[Any,String] =Graph.fromEdges(edges,"defaultProperty")

    graph.partitionBy(HDRF,128)


  }
}

.

package app

import org.apache.spark.graphx._
import scala.collection.concurrent.TrieMap

object HDRF extends PartitionStrategy{
  private var init=0; //lo puoi usare per controllare una fase di inizializzazione che viene eseguita solo la prima volta

  private var partitionsLoad:Array[Long] = Array.empty[Long] //carico (numero di archi) di ogni partizione
  private val vertexIdListPartitions: TrieMap[Long, List[Long]] = TrieMap() //lista di partizioni associate a ogni vertice
  private val vertexIdEdges: TrieMap[Long, Long] = TrieMap() //grado di ogni vertice

  private var edges = 0

  private var sum :Long= 0

  override def getPartition(src:VertexId,dst:VertexId,numParts:Int): PartitionID ={
    var valoreMax:Long =Int.MaxValue
    var partScarica:Int = -1
    var c:Int = 0
    if(init==0){
      init=1
      partitionsLoad=Array.fill[Long](numParts)(0)
    }


    //AGGIORNA IL GRADO CONOSCIUTO DEI VERTICI src E dst NELLA VARIABILE vertexIdEdges
    vertexIdEdges(src)=vertexIdEdges(src)+1
    vertexIdEdges(dst)=vertexIdEdges(dst)+1
    sum=vertexIdEdges(src) + vertexIdEdges(dst)

    //PARTIZIONA IL GRAFO
    if((!vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
      //NESSUNO DEI DUE VERTICI E' STATO MAI INSERITO IN QUALCHE PARTIZIONE
      //SCELGO LA PARTZIIONE PIU' SCARICA E LI ASSEGNO A QUELLA
      while(c==numParts){
        if(partitionsLoad(c)<valoreMax){
          valoreMax=partitionsLoad(c)
          partScarica=c
        }
        c=c+1
      }
      if(partScarica != -1) {
        partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
        vertexIdListPartitions(partScarica).union(List(src, dst))
      }
      return partScarica

    }else if(((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst)))||((!vertexIdListPartitions.contains(src))&&(vertexIdListPartitions.contains(dst)))){
      //UNO SOLO DEI DUE VERTICI E' GIA' PRESENTE IN ALMENO UNA PARTIZIONE
      if((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
        //SI TRATTA DI src
        //SCELGO LA PARTIZIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE src E CI REPLICO dst
        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(dst))
        }

      }else{
        //SI TRATTA DI dst
        //SCELGO LA PARTZIIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE dst E CI REPLICO src

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(src))
        }

      }
    }else if(!vertexIdListPartitions(src).intersect(vertexIdListPartitions(dst)).isEmpty){
      //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI ED ESISTE UNA INTERSEZIONE DEI SET NON NULLA (CIOE' ESISTE ALMENO UNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
      //SCELGO NELL'INTERSEZIONE DEI SET LA PARTIZIONE PIU' SCARICA

      while(c==numParts) {
        if (partitionsLoad(c) < valoreMax) {
          if (vertexIdListPartitions(c).contains(src) && vertexIdListPartitions(c).contains(dst)) {
            valoreMax = partitionsLoad(c)
            partScarica = c
          }
        }
        c = c + 1
      }
      if(partScarica != -1) {
        partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
        vertexIdListPartitions(partScarica).union(List(src))
      }

    }else {
      //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI MA L'INTERSEZIONE DEI SET E' NULLA (CIOE' NON ESISTE ALCUNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
      if((vertexIdEdges(src))>=(vertexIdEdges(dst))){
        //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO dst QUELLA PIU' SCARICA E CI COPIO src

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(dst)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(src))
        }

      }else{
        //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO src QUELLA PIU' SCARICA E CI COPIO dst

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(dst))
        }
      }

    }
    edges=edges+1
    if(edges==80000) {
      print(sum)
    }
    return partScarica
  }
}

I need to print sum, but I don't understand why it does not appear.


Solution

  • partitionBy, like many Graph functions, is a lazily-evaluated operation that generates a new Graph object, but doesn't actually compute that Graph until it's necessary - i.e. until some action is performed on the result (e.g. counting, persisting, or collecting it).

    Using a simpler example we can see that if we act on the result, these prints will be visible:

    object SimpleExample extends PartitionStrategy {
      override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
        println("partitioning!")
        numParts
      }
    }
    
    val result = graph.partitionBy(SimpleExample, 128) // nothing printed so far...
    
    result.edges.count() // now that we act on the result, 
    // we see "paritioning!" printed (several times). 
    

    NOTE that printing from a PartitionStrategy (or any transformation function passed to Spark to be performed on an RDD, a Graph, or a Dataset) is not too helpful: these functions are executed on the worker nodes, hence these prints will be "scattered" in outputs of different processes on different machines, and would probably NOT be visible in the output of the driver application (your main function).