I have one problem in Apache Spark GraphX, i tried to partition one graph with this method in the main:
graph.partitionBy(HDRF, 128)
HDRF is a method to do partitioning, I would like to print a val that is inside it, I tried to print but it does not print anything
/EDIT/
package app
import org.apache.spark.graphx._
import org.apache.spark._
import org.apache.spark.rdd.RDD
/**
* Main del sistema
*/
object Main{
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("HDRF"))
// mostra solo i log in caso di errore
sc.setLogLevel("ERROR")
//modifico il file di testo preso in ingresso
val edges:RDD[Edge[String]]=
sc.textFile("data/u1.base").map{ line =>
val fields= line.split("\t")
Edge(fields(0).toLong,fields(1).toLong,fields(2))
}
val graph: Graph[Any,String] =Graph.fromEdges(edges,"defaultProperty")
graph.partitionBy(HDRF,128)
}
}
.
package app
import org.apache.spark.graphx._
import scala.collection.concurrent.TrieMap
object HDRF extends PartitionStrategy{
private var init=0; //lo puoi usare per controllare una fase di inizializzazione che viene eseguita solo la prima volta
private var partitionsLoad:Array[Long] = Array.empty[Long] //carico (numero di archi) di ogni partizione
private val vertexIdListPartitions: TrieMap[Long, List[Long]] = TrieMap() //lista di partizioni associate a ogni vertice
private val vertexIdEdges: TrieMap[Long, Long] = TrieMap() //grado di ogni vertice
private var edges = 0
private var sum :Long= 0
override def getPartition(src:VertexId,dst:VertexId,numParts:Int): PartitionID ={
var valoreMax:Long =Int.MaxValue
var partScarica:Int = -1
var c:Int = 0
if(init==0){
init=1
partitionsLoad=Array.fill[Long](numParts)(0)
}
//AGGIORNA IL GRADO CONOSCIUTO DEI VERTICI src E dst NELLA VARIABILE vertexIdEdges
vertexIdEdges(src)=vertexIdEdges(src)+1
vertexIdEdges(dst)=vertexIdEdges(dst)+1
sum=vertexIdEdges(src) + vertexIdEdges(dst)
//PARTIZIONA IL GRAFO
if((!vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
//NESSUNO DEI DUE VERTICI E' STATO MAI INSERITO IN QUALCHE PARTIZIONE
//SCELGO LA PARTZIIONE PIU' SCARICA E LI ASSEGNO A QUELLA
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
valoreMax=partitionsLoad(c)
partScarica=c
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src, dst))
}
return partScarica
}else if(((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst)))||((!vertexIdListPartitions.contains(src))&&(vertexIdListPartitions.contains(dst)))){
//UNO SOLO DEI DUE VERTICI E' GIA' PRESENTE IN ALMENO UNA PARTIZIONE
if((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
//SI TRATTA DI src
//SCELGO LA PARTIZIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE src E CI REPLICO dst
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(dst))
}
}else{
//SI TRATTA DI dst
//SCELGO LA PARTZIIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE dst E CI REPLICO src
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}
}else if(!vertexIdListPartitions(src).intersect(vertexIdListPartitions(dst)).isEmpty){
//ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI ED ESISTE UNA INTERSEZIONE DEI SET NON NULLA (CIOE' ESISTE ALMENO UNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
//SCELGO NELL'INTERSEZIONE DEI SET LA PARTIZIONE PIU' SCARICA
while(c==numParts) {
if (partitionsLoad(c) < valoreMax) {
if (vertexIdListPartitions(c).contains(src) && vertexIdListPartitions(c).contains(dst)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c = c + 1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}else {
//ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI MA L'INTERSEZIONE DEI SET E' NULLA (CIOE' NON ESISTE ALCUNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
if((vertexIdEdges(src))>=(vertexIdEdges(dst))){
//SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO dst QUELLA PIU' SCARICA E CI COPIO src
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(dst)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}else{
//SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO src QUELLA PIU' SCARICA E CI COPIO dst
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(dst))
}
}
}
edges=edges+1
if(edges==80000) {
print(sum)
}
return partScarica
}
}
I need to print sum, but I don't understand why it does not appear.
partitionBy
, like many Graph
functions, is a lazily-evaluated operation that generates a new Graph
object, but doesn't actually compute that Graph until it's necessary - i.e. until some action is performed on the result (e.g. counting, persisting, or collecting it).
Using a simpler example we can see that if we act on the result, these prints will be visible:
object SimpleExample extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
println("partitioning!")
numParts
}
}
val result = graph.partitionBy(SimpleExample, 128) // nothing printed so far...
result.edges.count() // now that we act on the result,
// we see "paritioning!" printed (several times).
NOTE that printing from a PartitionStrategy
(or any transformation function passed to Spark to be performed on an RDD
, a Graph
, or a Dataset
) is not too helpful: these functions are executed on the worker nodes, hence these prints will be "scattered" in outputs of different processes on different machines, and would probably NOT be visible in the output of the driver application (your main function).