Search code examples
javaeventsmessagingnats.io

Nats request/reply in Java


I've previous Kafka knowledge and I've been playing around with Nats.io which seems a really solid choice for messaging.

In particular, I'm interested in the well-documented Request/Reply mechanism but I've had difficulties correctly implementing it in Java with the Jnats driver.

This is my Connector:

 // Single server nats connection
    @PostConstruct
    public void connect() throws ExternalServiceUnavailableException {

        Options options = new Options.Builder()
                .server(connectionString)
                .maxReconnects(20)
                .reconnectWait(Duration.ofSeconds(5))
                .connectionTimeout(Duration.ofSeconds(5))
                .connectionListener((conn, type) -> {
                    if (type == ConnectionListener.Events.CONNECTED) {
                        LOG.info("Connected to Nats Server");
                    } else if (type == ConnectionListener.Events.RECONNECTED) {
                        LOG.info("Reconnected to Nats Server");
                    } else if (type == ConnectionListener.Events.DISCONNECTED) {
                        LOG.error("Disconnected to Nats Server, reconnect attempt in seconds");
                    } else if (type == ConnectionListener.Events.CLOSED) {
                        LOG.info("Closed connection with Nats Server");
                    }
                })
                .build();


        try {
            connection = Nats.connect(options);

        } catch (Exception e) {
            LOG.error("Unable to connect to Nats Server");
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

This is the request method (with very high await time for testing purposes):

 public Optional<String> asyncRequest(String topic, String message) throws ExternalServiceUnavailableException {

        Future<Message> reply = natsConnector.getConnection().request(topic, message.getBytes());
        try {

            Message msg = reply.get(10L, TimeUnit.SECONDS);

            LOG.info(new String(msg.getData()));

            return Optional.of(new String(msg.getData(), StandardCharsets.UTF_8));

        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            LOG.error("Unable to retrieve response for the sent request: " + message);
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

And this is the Response Handler with the reply mechanism:

 @PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
        });

        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
        JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));

            if (requestMessage.getString("requestType").equals("stock-status")) {

                if (requestMessage.getString("of").equals("all")) {

                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    natsOperation.publishEvent("assets-info", response);
                    LOG.info("message sent");
                }
            }
        });
    }

My two independent Services communicate via a dockerized Nats.io, and I can correctly check via the Nats Go client that messages have been sent by both Services on the same topic.

Unfortunately, the "Requestor" when invokes the asyncRequest function doesn't quite handle the reply even with very high await in the reply.get(...).

When I try to evaluate the reply Object in debug mode, it does not have any data in it and shows a TimeoutException.

At msg.getData() the program crashes.

Do you guys have any hints for me? Thanks!


Solution

  • You should change your "replyer" code to publish to the replyTo subject from the original message.

    @PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {});
    
        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
            JSONObject requestMessage = new JSONObject(
                new String(message.getData(), StandardCharsets.UTF_8)
            );
    
            if (requestMessage.getString("requestType").equals("stock-status")) {
                if (requestMessage.getString("of").equals("all")) {
                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    //See Change Here
                    natsOperation.publish(message.getReplyTo(), response);
                    LOG.info("message sent");
                }
            }
        });
    }
    

    The request reply mechanism is looking for a single response on the generated replyTo subject.

    See https://docs.nats.io/nats-concepts/reqreply