Search code examples
javanats.ionats-streaming-server

Can't handle a message from NATS in my java/kotlin app


I have a class where I need to send a message to a NATS topic and receive a response and then process it. Here is my Java Spring app's service method:

public CreateRenewRsEvent getOrderCreationResponse(CreateRenewRqEvent createRenewRqEvent) {
    try {
        log.info("Sending  order.create event: {}", objectMapper.writeValueAsString(createRenewRqEvent));
        final Message response = natsConnection.request(
                    natsProperties.getSendTopic(),
                    objectMapper.writeValueAsBytes(createRenewRqEvent),
                    natsProperties.getTimeout()
            );
        log.info("Received  order.create response: {}", new String(response.getData(), StandardCharsets.UTF_8));
        response.ack();
        return objectMapper.readValue(response.getData(), CreateRenewRsEvent.class);
    } catch (Exception e) {
        log.warn("Error sending  order.create message", e);
    }
    return null;
}

I'm running this server locally in Docker, so there is another bean's method which should handle the message:

@PostConstruct
fun setUp() {
    natsConnection.createDispatcher { handleMessage(it) }
        .subscribe(natsProperties.sendTopic)
}

private fun handleMessage(msg: Message) {
    val receivedMessage = String(msg.data, StandardCharsets.UTF_8)
    val jsonNode = objectMapper.readTree(receivedMessage)
    if (!jsonNode.has("response") || !jsonNode.has("err")) {
        logger.info("Received order.create message: $receivedMessage")
        val responseEvent = getOrderCreateResponse(receivedMessage)
        natsConnection.publish(
            natsProperties.sendTopic,
            objectMapper.writeValueAsBytes(responseEvent)
        )
        logger.info("Sending order.create response: $responseEvent")
    }
}

The problem is when I try to read the message in my first method, I get NPE - message is null as well as its body.

I tried using a dispatcher with nextMessage() method, and it worked, but what I really need is using request() method - it's an integration with an external system.

What am I doing wrong?

P.S. Some more facts:

  1. Connection opens, so both sender and receiver are connected to the local server
  2. When in debug mode I send the message, I see there is only one topic on the server, it's order.create. But after handling the message, there is another one - "_INBOX.rel468mZPCUYu9tUu48JRa.rel468mZPCUYu9tUu48JhC". It's a value of msg.replyTo() field, but sending my response into that doesn't help either:(

Solution

  • I was both dumb and blind. All I needed is just to adjust using the response asynchronously, with CompletableFuture<Message>. Everything else works