Search code examples
javahornetq

Acknowledged messages are redelivered when crashed core client reconnects


I have setup a standalone HornetQ instance which is running locally. For testing purposes I have created a consumer using the HornetQ core API which will receive a message every 500 milliseconds.

I am facing a strange behaviour on the consumer side when my client connects and reads all the messages from queue and if I forced shutdown this (without properly closing the session/connection) then next time I start this consumer again it will read the old messages from the queue. Here is my consumer example:

// HornetQ Consumer Code

   public void readMessage() {
    ClientSession session = null;
    try {
        if (sf != null) {
            session = sf.createSession(true, true);

            ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
            session.start();

            while (true) {
                ClientMessage messageReceived = messageConsumer.receive(1000);
                if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
                    System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
                    messageReceived.acknowledge();
                }

                Thread.sleep(500);
            }
        }
    } catch (Exception e) {
        LOGGER.error("Error while adding message by producer.", e);
    } finally {
        try {
            session.close();
        } catch (HornetQException e) {
            LOGGER.error("Error while closing producer session,", e);
        }
    }
}

Can someone tell me why it is working like this, and what kind of configuration should I use in client/server side so that if a message read by consumer it will delete this from a queue?


Solution

  • You are not committing the session after the acknowledgements are complete, and you are not creating the session with auto-commit for acknowledgements enabled. Therefore, you should do one of the following:

    • Either explicitly call session.commit() after one or more invocations of acknowledge()
    • Or enable implicit auto-commit for acknowledgements by creating the session using sf.createSession(true,true) or sf.createSession(false,true) (the boolean which controls auto-commit for acknowledgements is the second one).

    Keep in mind that when you enable auto-commit for acknowledgements there is an internal buffer which needs to reach a particular size before the acknowledgements are flushed to the broker. Batching acknowledgements like this can drastically improve performance for certain high-volume use-cases. By default you need to acknowledge 1,048,576 bytes worth of messages in order to flush the buffer and send the acknowledgements to the broker. You can change the size of this buffer by invoking setAckBatchSize on your ServerLocator instance or by using a different createSession method (e.g. sf.createSession(true, true, myAckBatchSize)).

    If the acknowledgement buffer isn't flushed and your client crashes then the corresponding messages will still be in the queue when the client comes back. If the buffer hasn't reached its threshold it will still be flushed anyway when the consumer is closed gracefully.