Search code examples
springactivemq-classicspring-websocketspring-messaging

Spring Messaging/ActiveMQ Acknowledged Message not received


I have been working on this issue for a while now and decided to ask for some help.

I have the following scenario. ActiveMQ server listening on port 61614. Two WebSocketStompClient's are connecting to the following queues;

Client1: /queue/request/server1 - /queue/replyto/server1

Client2: /queue/request/server2 - /queue/replyto/server2

The 2 servers communicate and request information with each other. I have no problem with these scenarios. Send request from server 1 to server 2 queue and receive response on server1 response queue.

Kind of like this: I don't have 10 reputations...

Sending SEND {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], receipt=[3]} etc..

Received MESSAGE {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519] etc...

Random Application Logic executed here...

Sending SEND {destination=[/queue/replyto/server1] ,session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Received MESSAGE {destination=[/queue/replyto/server1], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...

However, there is a problem if you try to send another message to the request queue of server 1 before responding to the first request. The response is sent to the response queue of server 2 but it is never received.

The image here: Never receive the response( 3. RESPONSE) to the request (2. REQUEST)

Sending SEND { destination=[/queue/replyto/server2], session=[c504b2fe-ae63-1bc2-87ce-651682b7c98e],  receipt=[4], etc.
Received RECEIPT {receipt-id=[4]} session=c256762b-ddef-4109-8a3e-04bde832ed85

I hope it's clear enough, let me know if more explanation is required.

Additional information is that I am certain the message is sent as it is written in the queue and I can see it.

Queues

As you can also see, all the messages are dequeued/acknowledged.

Its worth mentioning that all of this is done locally with a docker ActiveMQ server and a unit test which mimics what happens on the development server.

With Wireshark, I can see that the message is acknowledged by server2.

send frame

tcp trace

Finally:

This is the setup and it is identical for both servers:

Client:

   WebSocketClient transport = new StandardWebSocketClient();
   WebSocketStompClient server1 = new WebSocketStompClient(transport);
   server1.setMessageConverter(new MappingJackson2MessageConverter());

    server1.setTaskScheduler(taskScheduler);
    server1.setDefaultHeartbeat(heartbeat);
    server1.connect(bindAddress, Server1SessionHandler);

Handler:

public class Server1SessionHandler extends StompSessionHandlerAdapter {
   @Override
    public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
        logger.debug("Entering RemoteSessionHandler after connected method");
        this.stompSession = session;

    if (session.isConnected()) {
        session.setAutoReceipt(true);

        logger.trace("Attempting to subscribe to channel {} using the RequestFrameHandler ", this.subscribeChannel);
        session.subscribe(this.subscribeChannel, new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));

        logger.trace("Attempting to subscribe to channel {} using the ResponseFrameHandler ", this.replyQueue);
        session.subscribe(this.replyQueue, new Server1ResponseFrameHandler(this.cache));

        publisher.publishEvent(new ConnectionSuccessEvent(Server1SessionHandler.class, "RemoteSessionHandler"));
    }
    else {
        logger.error("Could not connect to stomp Session {}", session.toString());
    }
}

}

Request Frame Handler:

public class Server1RequestFrameHandler implements StompFrameHandler {

private StompSession session;

public Server1RequestFrameHandler(final StompSession session) {
    this.session = session;
}

@Override
@SuppressWarnings("static-access")
public void handleFrame(final StompHeaders headers, final Object payload) {
    ...... BUSINESS LOGIC ......

            if (session.isConnected()) {
                session.send(header, response);
                logger.debug("Successfully connected using the stompsession {}, ", session.toString());
            }


}

@Override
public Type getPayloadType(final StompHeaders headers) {
    return Request.class;
}

}

Response Frame Handler:

public class Server1ResponseFrameHandler implements StompFrameHandler {

private Server1Cache cache;

public Server1ResponseFrameHandler(final Server1Cache cache) {
    this.cache = cache;
}

public void handleFrame(final StompHeaders headers, final Object payload) {
    Response response = (Response) payload;
    logger.debug("response: {}", response);
    cache.cacheMessage(id, response);
}

public Type getPayloadType(final StompHeaders headers) {
    return Response.class;
}

}

Let me know if you require more information.


Solution

  • I was able to bypass this by subscribing to the topics with 2 different WebSocket Connection setups. Such as: Create a Request client

    WebSocketClient requestTransport = new StandardWebSocketClient();
    WebSocketStompClient requestClient = new WebSocketStompClient(requestTransport);
    requestClient.setMessageConverter(new MappingJackson2MessageConverter());
    
    requestClient.setTaskScheduler(taskScheduler);
    requestClient.setDefaultHeartbeat(heartbeat);
    requestClient.connect(bindAddress, RequestSessionHandler);
    

    Create a separate response client

    WebSocketClient responseTransport = new StandardWebSocketClient();
    WebSocketStompClient responseClient = new WebSocketStompClient(responseTransport);
    responseClient.setMessageConverter(new MappingJackson2MessageConverter());
    
    responseClient.setTaskScheduler(taskScheduler);
    responseClient.setDefaultHeartbeat(heartbeat);
    responseClient.connect(bindAddress, ResponseSessionHandler);
    

    Where the request and response session handlers subscribe to only 1 Queue instead of both.

    public class RequestSessionHandler extends StompSessionHandlerAdapter {
    @Override
    public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
        ...
    
        if (session.isConnected()) {
           session.subscribe("requestChannel", new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
       ....
        }
    }
    }
    

    If someone has explanation about why that was happening when subscribing to both queues using only 1 WebSocket Client, I would love to hear it.

    My theory is that the thread that is serving the websocket connection was busy with the request and was not free to handle the response that was sent.