Search code examples
gremlintinkerpopjanusgraphtinkerpop3gremlin-server

Mutation Listener Event Strategy with remote JanusGraph is throwing serialization exception


I'd like to listen to mutations on a remote JanusGraph and I'm unable to figure out the correct setup to make it work.

JanusGraph stack:
JanusGraph docker image **0.5.2 (which is using Apache TinkerPop Gremlin 3.4.6) with cql-es configuration
Cassandra docker image 3.11.6
ElasticSearch docker image 7.3.1
Serializers section of gremlin-server-cql-es.yaml is updated with the following line:

- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry, org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerIoRegistryV3d0] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}

Java client stack: Based on pluradj/janusgraph-java-example
Java8
janusgraph-core 0.5.2
gremlin-driver 3.4.6
remote-objects.yaml looks as follows:

hosts: [127.0.0.1]
port: 8182
serializer: {
  className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0,
  config: {
    ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry, org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerIoRegistryV3d0]
  }
}

Complete code (without ConsoleMutationListener) looks like this:

public static void main(String[] args) {

    MutationListener mutationListener = new ConsoleMutationListener("Test");
    EventStrategy eventStrategy = EventStrategy.build().addListener(mutationListener).create();

    try (GraphTraversalSource g = AnonymousTraversalSource.traversal()
        .withRemote("conf/remote-graph.properties")
        .withStrategies(eventStrategy)) {

        g.addV("person").property("name", "Test").next();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

ConsoleMutationListener is a copy of TinkerPop's sample ConsoleMutationListener with modified constructor to accept graph name instead of a full graph, since toString() was the only method used anyways.

Stack trace:

io.netty.handler.codec.EncoderException: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: An error occurred during serialization of this request [RequestMessage{, requestId=9436b08c-7e31-4fc0-b480-40904055f491, op='bytecode', processor='traversal', args={gremlin=[[withStrategies(EventStrategy)], [addV(person), property(name, Test)]], aliases={g=g}}}] - it could not be sent to the server - Reason: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: java.lang.IllegalArgumentException: Class is not registered: org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy
Note: To register this class use: kryo.register(org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy.class);
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:708)
    at io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:56)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1102)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1149)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1073)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: An error occurred during serialization of this request [RequestMessage{, requestId=9436b08c-7e31-4fc0-b480-40904055f491, op='bytecode', processor='traversal', args={gremlin=[[withStrategies(EventStrategy)], [addV(person), property(name, Test)]], aliases={g=g}}}] - it could not be sent to the server - Reason: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: java.lang.IllegalArgumentException: Class is not registered: org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy
Note: To register this class use: kryo.register(org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy.class);
    at org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder.encode(WebSocketGremlinRequestEncoder.java:60)
    at org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder.encode(WebSocketGremlinRequestEncoder.java:38)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
    ... 12 more

If I remove the withStrategies(eventStrategy) the Vertex is added to the graph and I'm also able to query the graph normally. However I'm not able to configure the GraphTraversalSource with EventStrategy.

Q1: What I'm thinking is that message with defined Event Strategy cannot be serialized with GryoMessageSerializerV3d0 or Mutation Listener/Event Strategy should somehow be registered on the server side, but I can't find any references on how to do that. Are there any examples of such configuration?

Q2: What am I doing wrong? Is it even possible to use TinkerPop's EventStrategy with JanusGraph?

Q3: Is there any other approach to listen to remote JanusGraph mutations?

Changing serializer to GraphSONMessageSerializerV3d0 gives:

java.util.concurrent.CompletionException: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: EventStrategy does can only be constructed with instance() or create(Configuration)
    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
...
Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: EventStrategy does can only be constructed with instance() or create(Configuration)

Changing serializer to GraphBinaryMessageSerializerV1 gives:

java.util.concurrent.CompletionException: io.netty.handler.codec.DecoderException: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: The most significant bit should be set according to the format
    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
...
Caused by: io.netty.handler.codec.DecoderException: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: The most significant bit should be set according to the format


Solution

  • Q1: What I'm thinking is that message with defined Event Strategy cannot be serialized with GryoMessageSerializerV3d0 or Mutation Listener/Event Strategy should somehow be registered on the server side, but I can't find any references on how to do that.

    That is correct. EventStrategy does not work across remote connections.

    Q2: What am I doing wrong? Is it even possible to use TinkerPop's EventStrategy with JanusGraph?

    It is possible to use it with JanusGraph but only in embedded mode as the MutationListener implementations do not know how to send events back to the client. The driver would likely need some significant changes to introduce a mechanism to support that so it is a non-trivial change. If that were figured out then there still remain serialization issues to sort out for users who supply custom MutationListeners (though perhaps maybe that just wouldn't be allowed).

    Q3: Is there any other approach to listen to remote JanusGraph mutations?

    The key word there is "remote" and I don't think anything exists currently to allow that. You would need to build you're own of some sort. One way might be to configure "g" with EventStrategy on the server and then add a MutationListener that would send those events to a separate queue that you could consume remotely. You might also consider looking at the JanusGraph Bus and devise a similar scheme.