When trying to use the SparkGraphComputer to count the number of vertices on a titan graph on a cluster I get an error that I have no idea how to deal with. I am using tinkerpop 3.1.1-incubating and Titan 1.1.0-SNAPSHOT in my code and on the cluster I have installed datastax community edition 2.1.11 and spark 1.5.2-bin-hadoop2.6
I have put together a minimal Java example to reproduce my problem:
private void strippedDown() {
// a normal titan cluster
String titanClusterConfig = "titan-cassandra-test-cluster.properties";
// a hadoop graph with cassandra as input and gryo as output
String sparkClusterConfig = "titan-cassandra-test-spark.properties";
String edgeLabel = "blank";
// add a graph
int n = 100;
Graph titanGraph = GraphFactory.open(titanClusterConfig);
Vertex superNode = titanGraph.addVertex(T.label, String.valueOf(0));
for (int i=1;i<n;i++) {
Vertex currentNode = titanGraph.addVertex(T.label, String.valueOf(i));
currentNode.addEdge(edgeLabel,superNode);
}
titanGraph.tx().commit();
//count with titan
Long count = titanGraph.traversal().V().count().next();
System.out.println("The number of vertices in the graph is: "+count);
// count the graph using titan graph computer
count = titanGraph.traversal(GraphTraversalSource.computer(FulgoraGraphComputer.class)).V().count().next();
System.out.println("The number of vertices in the graph is: "+count);
// count the graph using spark graph computer
Graph sparkGraph = GraphFactory.open(sparkClusterConfig);
count = sparkGraph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next();
System.out.println("The number of vertices in the graph is: "+count);
}
The counts using OLTP and using OLAP with the FulgoraGraphComputer return the correct answer. The OLAP count using SparkGraphComputer however throws org.apache.spark.SparkException: Job aborted due to stage failure:
Interestingly if I run a similar script through the gremlin console packaged with Titan I get a different error for what seems to be the same algorithm:
graph = GraphFactory.open('titan-cassandra-test-cluster.properties')
graph.addVertex(T.label,"0")
graph.addVertex(T.label,"1")
graph.addVertex(T.label,"2")
graph.tx().commit()
sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()
This throws org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_keyspace_args(keyspace:null)
twice but completes and returns 0 which is incorrect.
I am aware of this article in the mailing list but I am having trouble understanding it or solving the issue. Could anyone explain to me what is happening and how to fix this? I have pasted my configs below.
gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage.backend=cassandrathrift
storage.hostname=node1
storage.cassandra.keyspace=mindmapstest
storage.cassandra.replication-factor=3
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
and
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=none
####################################
# Cassandra Cluster Config #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=250m
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
EDIT: this script will reproduce the error
graph = TitanFactory.open('titan-cassandra-test-cluster.properties')
superNode = graph.addVertex(T.label,"0")
for(i in 1..100) {
currentNode = graph.addVertex(T.label,i.toString())
currentNode.addEdge("blank",superNode)
}
graph.tx().commit()
graph.traversal().V().count()
graph.traversal(computer()).V().count()
sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()
I am not 100% sure the reasons I will give are right but I have managed to solve the issue. My problem stemmed from 3 underlying issues, all of them to do with configuration.
1) The first problem was kindly solved by Jason and this was related to having the correct configuration options for connecting to Cassandra - I am still curious what they even do.
2) The reason I could not get the java code to run successfully is because I had not set the HADOOP_GREMLIN_LIBS environmental variable correctly. For this reason the jars were not being deployed to the cluster for use in the graph computer. Once this was set the gremlin console and java examples had the same problem - returning a count of zero.
3) The count of zero was the hardest to solve. Again a case of not understanding the manual. There were many references to having hadoop installed on my cluster, but nowhere did it say how to connect to hadoop on the cluster. In order to do this an extra configuration option was required fs.defaultFS
which tells gremlin where to find the hadoop filesystem on the cluster. Once this was set correctly the count of vertices was correct.
My theory is that the computation executed correctly, but when the time came to reduce the counts from the spark workers they were persisted somewhere on the cluster and then when returning the answer to the console the local filesystem was looked at and nothing was found, thereby returning zero. This perhaps is a bug?
Anyway the final configuration file I required is this:
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=/test/output
####################################
# Cassandra Cluster Config #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
titanmr.ioformat.cf-name=edgestore
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=mindmapstest
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647
spark.eventLog.enabled=true
####################################
# Hadoop Cluster configuration #
####################################
fs.defaultFS=hdfs://node1:9000