Search code examples
graphqlvert.xvertx-httpclient

Handle Subscription in vertx GraphQL


I tried to use Vertx HttpClient/WebClient to consume the GraphQLSubscritpion but it did not work as expected.

The server-side related code(written with Vertx Web GraphQL) is like the following, when a comment is added, then trigger onNext to send the comment to the Publisher.

    public VertxDataFetcher<UUID> addComment() {
        return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
            var commentInputArg = dfe.getArgument("commentInput");
            var jacksonMapper = DatabindCodec.mapper();
            var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
            return this.posts.addComment(input)
                .onSuccess(id -> this.posts.getCommentById(id.toString())
                                        .onSuccess(c ->subject.onNext(c)));
        });
    }

    private BehaviorSubject<Comment> subject = BehaviorSubject.create();

    public  DataFetcher<Publisher<Comment>> commentAdded() {
        return (DataFetchingEnvironment dfe) -> {
            ConnectableObservable<Comment> connectableObservable = subject.share().publish();
            connectableObservable.connect();
            return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
        };
    }

In the client, I mixed to use the HttpClient/WebClient, most of the time, I would like to use WebClient, which easier for handling form post. But it seems it does not work have a WebSocket connection.

So the websocket part is returning to use HttpClient.


 var options = new HttpClientOptions()
            .setDefaultHost("localhost")
            .setDefaultPort(8080);

        var httpClient = vertx.createHttpClient(options);
        httpClient.webSocket("/graphql")
            .onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));

                JsonObject messageInit = new JsonObject()
                    .put("type", "connection_init")
                    .put("id", "1");

                JsonObject message = new JsonObject()
                    .put("payload", new JsonObject()
                        .put("query", "subscription onCommentAdded { commentAdded { id content } }"))
                    .put("type", "start")
                    .put("id", "1");

                ws.write(messageInit.toBuffer());
                ws.write(message.toBuffer());

})
            .onFailure(e -> log.error("error: {}", e));


// this client here is WebClient.
        client.post("/graphql")
            .sendJson(Map.of(
                "query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
                "variables", Map.of(
                    "input", Map.of(
                        "postId", id,
                        "content", "comment content of post id" + LocalDateTime.now()
                    )
                )
            ))
            .onSuccess(
                data -> log.info("data of addComment: {}", data.bodyAsString())
            )
            .onFailure(e -> log.error("error: {}", e));

When running the client and server, the comment is added, but the WebSocket client does not print any info about websocket message. On the server console, there is an message like this.

2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors

It seems the backend commentAdded datafetcher is not invoked at all.

The complete codes of GraphQL client and server are shared on my Github.


Solution

  • After reading some testing codes of Vertx Web GraphQL, I found I have to add the ConnectionInitHandler on ApolloWSHandler like this.

    .connectionInitHandler(connectionInitEvent -> {
            JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
            if (payload != null && payload.containsKey("rejectMessage")) {
              connectionInitEvent.fail(payload.getString("rejectMessage"));
              return;
            }
            connectionInitEvent.complete(payload);
          }
    )
    
    

    When the client sends connection_init message, the connectionInitEvent.complete is required to start the communication between the client and the server.