I am using JMS to process messages in a Java 1.8 SE environment. The messages originate from an Oracle Advanced Queue. Because it may take a while to process a message, I decided to have a pool of 5 worker threads (the MessageHandler objects), so that more than one thread could be processing messages at once. I would like to have guaranteed delivery with no duplicate message delivery.
I use
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
to create the QueueSession. I use the code below to process incoming messages. Basically, onMessage
spawns a thread that processes a message.
public class JmsQueueListener implements MessageListener
{
/** A pool of worker threads for handling requests. */
private final ExecutorService pool;
OracleJmsQueue queue;
public void onMessage(Message msg)
{
pool.execute(new MessageHandler(msg));
// can't commit here - the thread may still be processing
}
/**
* This class provides a "worker thread" for processing a message
* from the queue.
*/
private class MessageHandler implements Runnable {
/**
* The message to process
*/
Message message;
/**
* The constructor stores the passed in message as a field
*/
MessageHandler(Message message) {
this.message = message;
}
/**
* Processes the message provided to the constructor by
* calling the appropriate business logic.
*/
public void run() {
QueueSession queueSession = queue.getQueueSession();
try {
String result = requestManager.processMessage(message);
if (result != null) {
queueSession.commit();
}
else {
queueSession.rollback();
}
}
catch (Exception ex) {
try {
queueSession.rollback();
}
catch (JMSException e) {
}
}
}
} // class MessageHandler
My problem is that I don't know how to indicate to the originating queue whether or not processing has completed successfully. I can't commit at the end of onMessage
, because the thread may not have completed processing. I don't think that where I currently have commit
s and rollback
s is any good either. For example, if the 5 worker threads are in various states of completion, what is the state of the queue session being committed?
I think I must be missing some fundamental concept on how to handle JMS in a multi-threaded fashion. Any help would be much appreciated.
You are using a asynchronous message processing so, unless you implement a way ensure that each message processing is done chronologically, you will end up the a scenario where an older message processing finishes after a recent message processing. So why using a messaging service?
A simple solution to your problem is commit
at end of the onMessage
method, and in the body of your messageHandler
re-enqueue the message in case of an error. However this solution can have a problem, if the re-enqueue itself fails.