I want to receive messages through a MessageHandler
bound to consumer. When I use a core ClientMessage
as the container the message will never be acked with acknowledge()
, but ok with the individualAcknowledge()
method. It's quite different when using javax.jms.Message
.
The Code:
@Override
public void onMessage(ClientMessage message) {
try {
//acknowledge() method won't work, the message still in the queue
//message.acknowledge();
message.individualAcknowledge();
} catch (ActiveMQException e) {
log.error("message acknowledge error: ", e);
}
}
But when I simply change the parameter to javax.jms.Message
. The ack method worked. I've checked the origin code of both:
ClientMessage:
public void acknowledge(ClientMessage message) throws ActiveMQException {
ClientMessageInternal cmi = (ClientMessageInternal)message;
if (this.ackIndividually) {
this.individualAcknowledge(message);
} else {
this.ackBytes += message.getEncodeSize();
if (logger.isTraceEnabled()) {
logger.trace(this + "::acknowledge ackBytes=" + this.ackBytes + " and ackBatchSize=" + this.ackBatchSize + ", encodeSize=" + message.getEncodeSize());
}
if (this.ackBytes >= this.ackBatchSize) {
if (logger.isTraceEnabled()) {
logger.trace(this + ":: acknowledge acking " + cmi);
}
this.doAck(cmi);
} else {
if (logger.isTraceEnabled()) {
logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
}
this.lastAckedMessage = cmi;
}
}
}
And the javax.jms.Message
:
public void acknowledge() throws JMSException {
if (this.session != null) {
try {
if (this.session.isClosed()) {
throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
}
if (this.individualAck) {
this.message.individualAcknowledge();
}
if (this.clientAck || this.individualAck) {
this.session.commit(this.session.isBlockOnAcknowledge());
}
} catch (ActiveMQException var2) {
throw JMSExceptionHelper.convertFromActiveMQException(var2);
}
}
}
It's quite obviously that javax.jms.Message
commits the session whenever a message is acked, but ClientMessage
will calculate whether the acked message is reach the max size of the ackBatchSize
, if not, it will store this message in a parameter called lastAckedMessage.
So,my question is:
lastAckedMessage
do when I just have 1 message that is too far too reach the bathSize? How could I ack the message? Close the session to force commit?The session is configured to autoCommitSends and autoCommitAcks.
After I read the doc, I try to change the acknowledge()
to individualAcknowledge()
and it works, but i'm still wondering why the acknowledge()
can't.
The core API is a relatively low-level API aimed at fine-grained control for maximizing performance. It shares some similarities with the JMS API, but some important differences as well.
Specifically regarding consumers, acknowledgements are costly in terms of performance because they require a round-trip to the broker. Therefore, there are several ways to tune acknowledgements in the core API.
The first tuning step is choosing whether or not to automatically commit acknowledgements. This decision is made when creating the session (e.g. via this method). If acknowledgements are automatically committed then the application never has to invoke commit()
itself, but that also means it can't support more advanced use-cases which would require rollback()
.
If the user chooses to automatically commit acknowledgements, as you have, then the next tuning step is how often these acknowledgements should be committed. This decision is made by specifying the "acknowledgement batch size" (e.g. via this method).
If you want to automatically acknowledge messages and you want the acknowledgements to be committed immediately then set your ackBatchSize to 0
. If you want full control over acknowledgements then don't automatically acknowledge the messages and invoke commit()
"manually". However, keep in mind that forcing a round-trip to the broker for every acknowledgement will reduce performance (potentially considerably).
Lastly, the lastAckedMessage
is tracked so that only one acknowledgement needs to be sent to the broker (i.e. the most recent) and then every message sent before that one is acknowledged as well. This is an optimization which relieves the client from the burden of having to send an individual acknowledgement for every single message it has received.