The JMS 1.1 spec, section 4.4.11, says, "Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been delivered by its session."
However, this is not the behavior that I observe with Solace. I wrote the following 100-line program which sends twenty messages, then reads the messages and alternates between acknowledging and dropping them. The result is that all even-numbered messages remain on the queue.
So does Solace violate the JMS spec, or am I missing something?
package com.example;
import java.util.function.Predicate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
public class SolaceAckTest {
private static final String host = "localhost";
private static final String username = "MyUser";
private static final String password = "MyPassword";
private static final String COUNTER_PROPERTY_NAME = "MyCounter";
private static final String QUEUE_NAME = "MATCHED_1";
private static final int NUM_MESSAGES_TO_SEND = 20;
private static final long SENDING_INTERVAL_IN_MILLISECONDS = 100;
/**
* Determines on which messages we should call
* {@link Message#acknowledge()}.
*/
private static final Predicate<Message> SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE = new Predicate<Message>() {
@Override
public boolean test(Message m) {
try {
return (m.getIntProperty(COUNTER_PROPERTY_NAME) % 2) == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
};
public static void main(String[] args) throws Exception {
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setRespectTTL(true);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
Destination requestDest = queueSession.createQueue(QUEUE_NAME);
queueSession.createConsumer(requestDest).setMessageListener(new MessageListenerThatAcknowledgesSomeMessages());
MessageProducer messageProducer = queueSession.createProducer(requestDest);
queueConnection.start();
for (int counter = 1; counter <= NUM_MESSAGES_TO_SEND; counter++) {
TextMessage msg = queueSession.createTextMessage();
msg.setText("Message #" + counter);
msg.setIntProperty(COUNTER_PROPERTY_NAME, counter);
messageProducer.send(msg);
Thread.sleep(SENDING_INTERVAL_IN_MILLISECONDS);
}
// Prevent the program from terminating.
Thread.sleep(1000);
}
/**
* A listener that calls {@link Message#acknowledge()} only on messages that
* meet the criteria specified by
* {@link SolaceAckTest#SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE}.
*/
private static class MessageListenerThatAcknowledgesSomeMessages implements MessageListener {
public MessageListenerThatAcknowledgesSomeMessages() {
}
@Override
public void onMessage(Message msg) {
try {
final String text = ((TextMessage) msg).getText();
if (SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE.test(msg)) {
msg.acknowledge();
System.out.println("Acknowledging message: " + text);
} else {
System.out.println("Not acknowledging message: " + text);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
I believe Tim is correct. In your example it looks like you're using a Solace extension rather than the standard JMS client acknowledgement mode. Please try specifying the standard JMS client acknowledgement mode when you create your session. For example:
QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
The SOL_CLIENT_ACKNOWLEDGE extension you originally specified allows you to acknowledge a specific message without implicitly acknowledging all of the other messages received by the session. This is useful if you have multiple worker threads processing messages from the session. Each thread can acknowledge its message when it is done processing without implicitly acknowledging messages being worked on by the other threads.