We have on-premise installation of ServiceBus. I noticed strange behavior when dealing with redelivery. I created sample application, see bellow. This application will post 5 messages to topic with persistent queue and than tries to read all the messages. For each message it will create new session and start/stop message delivery (the timeouts are needed for graceful start/stop of qpid background thread). I suppose that prefetch will make note read messages peek locked. I expect to eventually get all messages, but some messages are being lost. Max delivery count is set to 10. The queue was examined by Serviuce Bus Explorer, it is empty after the test and the missing messages are not in Dealetter queue.
The test used to manifest the behavior, this is not the way one should consume messages:
private static final long RECEIVE_TIMEOUT_MS = 5000;
private static final long SLEEP_BETWEEN_SESSIONS_MS = 1000;
private static final long SLEEP_PEEK_TIMEOUT_MS = 70000;
private static final int MAX_SUBSEQUENT_FAILURES = 3;
private static final int MESSAGES_TO_TEST = 5;
// send some messages to empty queue
for (int i = 0; i < MESSAGES_TO_TEST; i++) {
testSendToTopic(connection, context, Integer.toString(i));
Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
}
// wait for message
OUTER:
for (int i = 0; i < MESSAGES_TO_TEST; i++) {
Thread.sleep(SLEEP_BETWEEN_SESSIONS_MS);
int subsequentFailures = 0;
while (!testReceiveFromQueue(connection, context)) {
log.info("Waiting for peek lock timeout");
Thread.sleep(SLEEP_PEEK_TIMEOUT_MS);
subsequentFailures++;
if (subsequentFailures > MAX_SUBSEQUENT_FAILURES) {
log.info("Giving up");
break OUTER;
}
}
}
This is log, message 1, 2 and 4 were lost:
19:05:51,012 [ main] INFO [Qpid] Message sent, id: 0
19:05:52,039 [ main] INFO [Qpid] Message sent, id: 1
19:05:53,055 [ main] INFO [Qpid] Message sent, id: 2
19:05:54,074 [ main] INFO [Qpid] Message sent, id: 3
19:05:55,088 [ main] INFO [Qpid] Message sent, id: 4
19:05:57,131 [ main] INFO [Qpid] Received message, id: 0, redelivered: false
19:05:57,133 [ main] INFO [Qpid] Message acknowledged
19:06:03,342 [ main] INFO [Qpid] Queue is empty
19:06:03,345 [ main] INFO [Qpid] Waiting for peek lock timeout
19:07:13,358 [ main] INFO [Qpid] Received message, id: 4, redelivered: true
19:07:13,359 [ main] INFO [Qpid] Message acknowledged
19:07:19,367 [ main] INFO [Qpid] Queue is empty
19:07:19,370 [ main] INFO [Qpid] Waiting for peek lock timeout
19:08:34,379 [ main] INFO [Qpid] Queue is empty
19:08:34,381 [ main] INFO [Qpid] Waiting for peek lock timeout
19:09:49,400 [ main] INFO [Qpid] Queue is empty
19:09:49,402 [ main] INFO [Qpid] Waiting for peek lock timeout
19:11:04,417 [ main] INFO [Qpid] Queue is empty
19:11:04,419 [ main] INFO [Qpid] Waiting for peek lock timeout
19:12:14,423 [ main] INFO [Qpid] Giving up
Missing methods (EDITED, simplified code):
static void testSendToTopic(Connection connection, Context context) throws JMSException,
NamingException {
Session session = null;
MessageProducer messageProducer = null;
try {
session = connection.createSession(false/*transacted*/, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) context.lookup("ORDER_HISTORY_TOPIC");
messageProducer = session.createProducer(topic);
TextMessage message = session.createTextMessage("Hello MS SB");
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
messageProducer.send(message);
log.info("Message sent");
} finally {
if (null != messageProducer)
messageProducer.close();
if (null != session)
session.close();
}
}
static boolean testReceiveFromQueue(Connection connection, Context context)
throws JMSException,
NamingException {
Session session = null;
MessageConsumer consumer = null;
try {
session = connection.createSession(false/*transacted*/, Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("ORDER_HISTORY_QUEUE");
consumer = session.createConsumer(queue);
// start delivery of incoming messages, otherwise receiveXXX will not get any
connection.start();
// even when there are messages, receiveNoWait may return null
Message message = consumer.receive(RECEIVE_TIMEOUT_MS);
if (null == message) {
log.info("Nothing to receive");
return false;
}
log.info("Received message");
// must be acknowledged before peek lock expires (see Lock Duration)
// Until peek lock timeout the message is will not be delivered to other receivers
// on the same subscription
message.acknowledge();
log.info("Acknowledged");
return true;
} finally {
connection.stop();
if (consumer != null)
consumer.close();
if (null != session)
session.close();
}
}
Fixed in QPID trunk, thanks Rob, https://issues.apache.org/jira/browse/QPID-5570