Search code examples
gremlin-serverjanusgraph

How to connect to Gremlin Server through Java using the Gremlin Driver with sessionless


I want my application to connect to two remote servers Gremlinserver/Janusserver.Both have the same Cassandra database. And in this way thatI would have high availability.

<dependency>
    <groupId>org.janusgraph</groupId>
    <artifactId>janusgraph-core</artifactId>
    <version>0.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.tinkerpop</groupId>
    <artifactId>gremlin-driver</artifactId>
    <version>3.2.6</version>
</dependency>

file gremlin.yaml:

hosts: [127.0.0.1,192.168.2.57]
port: 8182
serializer: { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}

In my service class I have several methods, each of which connects through the client object:

public class GremlinServiceConcrete implements GremlinService {
...
..
public Set<Long> getImpactedComponentsIds (...) throws GremlinServiceException {
..
        Cluster cluster = gremlinCluster.getCluster();
        Client client = null;
        Set<Long> impactedIds = Sets.newHashSet();
        try {
            client = cluster.connect();
            binding = Maps.newLinkedHashMap();
..

And in the GremlinCluster Class, I call the driver

public class GremlinCluster {

    public static final int MIN_CONNECTION_POOL_SIZE = 2;
    public static final int MAX_CONNECTION_POOL_SIZE = 20;
    public static final int MAX_CONTENT_LENGTH = 65536000;

    private static Logger logger = LoggerFactory.getLogger(GremlinCluster.class);

    private String server;
    private Integer port;

    private Cluster cluster;

    public GremlinCluster(String server, Integer port) throws FileNotFoundException {
        this.server = Objects.requireNonNull(server);
        this.port = Objects.requireNonNull(port);
        this.cluster = init();
    }

    private Cluster init() throws FileNotFoundException {
        GryoMapper.Builder kryo = GryoMapper.build().addRegistry(JanusGraphIoRegistry.getInstance());
        MessageSerializer serializer = new GryoMessageSerializerV1d0(kryo);
        Cluster cluster = Cluster.build(new File("conf/driver-gremlin.yaml")).port(port)
                .serializer(serializer)
                .minConnectionPoolSize(MIN_CONNECTION_POOL_SIZE)
                .maxConnectionPoolSize(MAX_CONNECTION_POOL_SIZE)
                .maxContentLength(MAX_CONTENT_LENGTH).create();

        logger.debug(String.format("New cluster connected at %s:%s", server, port));
        return cluster;
    }

    public Cluster getCluster() {
        return cluster;
    }

    public void destroy() {
        try {
            cluster.close();
        } catch (Exception e) {
            logger.debug("Error closing cluster connection: " + e.toString());
        }
    }

}

The application works well by connecting to only one server. When you connect to the servers it runs very slow. If I stop a server does not run failover correctly I suspect that the servers are connected in session mode. The Tinkerpop documentation does not specify the code differences between the two modes.

Correction: The slowness was due to the debug mode of eclipse. The application sends requests to both gremlinservers, this part of the clustering functionality works fine.

Incorrect operation happens when a server is shutdown. The application sends the requests to the other server. If the downed server is booted, the gremlin server does not detect it and does not reconnect.

output from gremlinserver: enter image description here

GremlinCluster is a spring bean (beans-services.xml):

<bean id="gremlinCluster" class="[Fully qualified name].GremlinCluster" scope="singleton" destroy-method="destroy">
    <constructor-arg name="server"><value>${GremlinServerHost}</value></constructor-arg>
    <constructor-arg name="port"><value>${GremlinServerPort}</value></constructor-arg>
</bean>

And in a property file.

GremlinServerHost=[Fully qualified name]/config/gremlin.yaml
GremlinServerPort=8182

And in the GremlinCluster Class:

import java.util.Objects;

import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;

public class GremlinCluster {

    public static final int MIN_CONNECTION_POOL_SIZE = 2;
    public static final int MAX_CONNECTION_POOL_SIZE = 20;
    public static final int MAX_CONTENT_LENGTH = 65536000;

    private static Logger logger = LoggerFactory.getLogger(GremlinCluster.class);

    private String server;
    private Integer port;

    private Cluster cluster;

    public GremlinCluster(String server, Integer port) throws FileNotFoundException {
        this.server = Objects.requireNonNull(server);
        this.port = Objects.requireNonNull(port);
        this.cluster = init();
    }

    private Cluster init() throws FileNotFoundException {
        GryoMapper.Builder kryo = GryoMapper.build().addRegistry(JanusGraphIoRegistry.getInstance());
        MessageSerializer serializer = new GryoMessageSerializerV1d0(kryo);
        Cluster cluster = Cluster.build(new File(server)).port(port)
                .serializer(serializer)
                .minConnectionPoolSize(MIN_CONNECTION_POOL_SIZE)
                .maxConnectionPoolSize(MAX_CONNECTION_POOL_SIZE)
                .maxContentLength(MAX_CONTENT_LENGTH).create();

        logger.debug(String.format("New cluster connected at %s:%s", server, port));
        return cluster;
    }

    public Cluster getCluster() {
        return cluster;
    }

    public void destroy() {
        try {
            cluster.close();
        } catch (Exception e) {
            logger.debug("Error closing cluster connection: " + e.toString());
        }
    }

}

And a example with a method that makes a query (GremlinServiceConcrete):

@Override
    public Long getNeighborsCount(List<Long> componentIds) throws GremlinServiceException {
        // Check argument is right
        if (componentIds == null || componentIds.isEmpty()) {
            throw new GremlinServiceException("Cannot compute neighbors count with an empty list as argument");
        }

        Cluster cluster = gremlinCluster.getCluster();
        Client client = null;
        try {
            client = cluster.connect();
            String gremlin = "g.V(componentIds).both().dedup().count()";
            Map<String, Object> parameters = Maps.newHashMap();
            parameters.put("componentIds", componentIds);

            if (logger.isDebugEnabled()) logger.debug("Submiting query [ " + gremlin + " ] with binding [ " + parameters + "]");

            ResultSet resultSet = client.submit(gremlin, parameters);
            Result result = resultSet.one();
            return result.getLong();

        } catch (Exception e) {
            throw new GremlinServiceException("Error retrieving how many neighbors do vertices " + componentIds + " have: " + e.getMessage(), e);

        } finally {
            if (client != null) try { client.close(); } catch (Exception e) { /* NPE because connection was not initialized yet */ }
        }
    }

gremlin-server.yaml:

host: 127.0.0.1
port: 8182
scriptEvaluationTimeout: 600000
channelizer: org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer
graphs: {
  graph: conf/janusgraph-cassandra.properties
}
plugins:
  - janusgraph.imports
scriptEngines: {
  gremlin-groovy: {
    imports: [java.lang.Math,org.janusgraph.core.schema.Mapping],
    staticImports: [java.lang.Math.PI],
    scripts: [scripts/empty-sample.groovy]}}
serializers:
  - {
      className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0,
      config: {
        bufferSize: 819200,
        ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry]
      }
    }
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
processors:
  - { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }}
  - { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }}
metrics: {
  consoleReporter: {enabled: true, interval: 180000},
  csvReporter: {enabled: true, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv},
  jmxReporter: {enabled: true},
  slf4jReporter: {enabled: true, interval: 180000},
  gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST},
  graphiteReporter: {enabled: false, interval: 180000}}
maxInitialLineLength: 4096
maxHeaderSize: 8192
maxChunkSize: 4096000
maxContentLength: 65536000
maxAccumulationBufferComponents: 1024
resultIterationBatchSize: 64
writeBufferLowWaterMark: 32768
writeBufferHighWaterMark: 655360

janusgraph-cassandra.properties:

gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cassandrathrift
storage.hostname=192.168.2.57,192.168.2.70,192.168.2.77
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
#storage.cassandra.replication-strategy-class=org.apache.cassandra.locator.NetworkTopologyStrategy
#storage.cassandra.replication-strategy-options=dc1,2,dc2,1
storage.cassandra.read-consistency-level=QUORUM
storage.cassandra.write-consistency-level=QUORUM
ids.authority.conflict-avoidance-mode=GLOBAL_AUTO

Solution

  • If I understand correctly, you're saying that if a Gremlin Server goes down, the requests start routing to the server exclusively but when that downed server comes back online the client doesn't recognize that it is back so all the requests continue to flow to the one server that stayed up the entire time. If that is right, I can't recreate your problem, at least on Gremlin Server 3.3.0 (though I don't suspect a different behavior on 3.2.x as I don't know of any real changes that have occurred to the driver in 3.3.0 that did not also occur on 3.2.x).

    Your code doesn't really fully show how you are testing though. In my tests I used the Gremlin Console to do this:

    gremlin> cluster = Cluster.build().addContactPoint("192.168.1.7").addContactPoint("192.168.1.6").create()
    ==>/192.168.1.7:8182, localhost/127.0.0.1:8182
    gremlin> client = cluster.connect()
    ==>org.apache.tinkerpop.gremlin.driver.Client$ClusteredClient@1bd0b0e5
    gremlin> (0..<100000).collect{client.submit("1+1").all().get()}.toList();[]
    java.util.concurrent.ExecutionException: java.nio.channels.ClosedChannelException
    Type ':help' or ':h' for help.
    Display stack trace? [yN]n
    gremlin> (0..<100000).collect{client.submit("1+1").all().get()}.toList();[]
    

    The ClosedChannelException showed where I killed a server. I then took note from Gremlin Server logs of how many requests had been submitted to the server that stayed online. I then restarted the server I had killed and restarted the flow of requests in the Gremlin Console. When I looked at both request counts they had both incremented, which means that the driver was able to detect that the downed server had come back online.

    It's not clear from your question how you are determining that the driver is not reconnecting, but I'm noticing that you are also creating and destroying the Cluster object in a way that looks like it is being done per request to your getImpactedComponentsIds application service. You should really just create the Cluster object once and re-use it. It has expensive object creation as it spins up a number of network resource pools. It is possible that you are not seeing reconnect because of this create/destroy approach.

    While thinking about this, I though that I could envision a scenario where the create/destroy approach for Cluster could make things appear as though reconnect wasn't happening, but the load balancing approach in the driver should randomly select a host at creation so unless you were massively unlucky for that random selection to always go to the same host for every single test you did, you should have seen it connect at least some of the time to the downed server.