Search code examples
apache-sparktitantinkerpop3

counting vertices on a titan graph using SparkGraphComputer throws org.apache.spark.SparkException: Job aborted due to stage failure:


When trying to use the SparkGraphComputer to count the number of vertices on a titan graph on a cluster I get an error that I have no idea how to deal with. I am using tinkerpop 3.1.1-incubating and Titan 1.1.0-SNAPSHOT in my code and on the cluster I have installed datastax community edition 2.1.11 and spark 1.5.2-bin-hadoop2.6

I have put together a minimal Java example to reproduce my problem:

private void strippedDown() {
    // a normal titan cluster
    String titanClusterConfig = "titan-cassandra-test-cluster.properties";

    // a hadoop graph with cassandra as input and gryo as output
    String sparkClusterConfig = "titan-cassandra-test-spark.properties";

    String edgeLabel = "blank";

    // add a graph
    int n = 100;
    Graph titanGraph = GraphFactory.open(titanClusterConfig);
    Vertex superNode = titanGraph.addVertex(T.label, String.valueOf(0));
    for (int i=1;i<n;i++) {
        Vertex currentNode = titanGraph.addVertex(T.label, String.valueOf(i));
        currentNode.addEdge(edgeLabel,superNode);
    }
    titanGraph.tx().commit();

    //count with titan
    Long count = titanGraph.traversal().V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using titan graph computer
    count = titanGraph.traversal(GraphTraversalSource.computer(FulgoraGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using spark graph computer
    Graph sparkGraph = GraphFactory.open(sparkClusterConfig);
    count = sparkGraph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);
}

The counts using OLTP and using OLAP with the FulgoraGraphComputer return the correct answer. The OLAP count using SparkGraphComputer however throws org.apache.spark.SparkException: Job aborted due to stage failure:

Interestingly if I run a similar script through the gremlin console packaged with Titan I get a different error for what seems to be the same algorithm:

graph = GraphFactory.open('titan-cassandra-test-cluster.properties')
graph.addVertex(T.label,"0")
graph.addVertex(T.label,"1")
graph.addVertex(T.label,"2")
graph.tx().commit()
sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()

This throws org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_keyspace_args(keyspace:null) twice but completes and returns 0 which is incorrect.

I am aware of this article in the mailing list but I am having trouble understanding it or solving the issue. Could anyone explain to me what is happening and how to fix this? I have pasted my configs below.

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage.backend=cassandrathrift
storage.hostname=node1
storage.cassandra.keyspace=mindmapstest
storage.cassandra.replication-factor=3
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5

and

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=none
####################################
# Cassandra Cluster Config         #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=250m
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

EDIT: this script will reproduce the error

graph = TitanFactory.open('titan-cassandra-test-cluster.properties')
superNode = graph.addVertex(T.label,"0")
for(i in 1..100) {
  currentNode = graph.addVertex(T.label,i.toString())
  currentNode.addEdge("blank",superNode)
}
graph.tx().commit()

graph.traversal().V().count()

graph.traversal(computer()).V().count()

sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()

Solution

  • I am not 100% sure the reasons I will give are right but I have managed to solve the issue. My problem stemmed from 3 underlying issues, all of them to do with configuration.

    1) The first problem was kindly solved by Jason and this was related to having the correct configuration options for connecting to Cassandra - I am still curious what they even do.

    2) The reason I could not get the java code to run successfully is because I had not set the HADOOP_GREMLIN_LIBS environmental variable correctly. For this reason the jars were not being deployed to the cluster for use in the graph computer. Once this was set the gremlin console and java examples had the same problem - returning a count of zero.

    3) The count of zero was the hardest to solve. Again a case of not understanding the manual. There were many references to having hadoop installed on my cluster, but nowhere did it say how to connect to hadoop on the cluster. In order to do this an extra configuration option was required fs.defaultFS which tells gremlin where to find the hadoop filesystem on the cluster. Once this was set correctly the count of vertices was correct.

    My theory is that the computation executed correctly, but when the time came to reduce the counts from the spark workers they were persisted somewhere on the cluster and then when returning the answer to the console the local filesystem was looked at and nothing was found, thereby returning zero. This perhaps is a bug?

    Anyway the final configuration file I required is this:

    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
    gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
    gremlin.hadoop.jarsInDistributedCache=true
    gremlin.hadoop.inputLocation=none
    gremlin.hadoop.outputLocation=/test/output
    ####################################
    # Cassandra Cluster Config         #
    ####################################
    titanmr.ioformat.conf.storage.backend=cassandrathrift
    titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
    titanmr.ioformat.conf.storage.hostname=node1,node2,node3
    titanmr.ioformat.cf-name=edgestore
    ####################################
    # SparkGraphComputer Configuration #
    ####################################
    spark.master=spark://node1:7077
    spark.executor.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    ####################################
    # Apache Cassandra InputFormat configuration
    ####################################
    cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
    cassandra.input.keyspace=mindmapstest
    cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
    cassandra.input.columnfamily=edgestore
    cassandra.range.batch.size=2147483647
    spark.eventLog.enabled=true
    ####################################
    # Hadoop Cluster configuration     #
    ####################################
    fs.defaultFS=hdfs://node1:9000