I have a JMS application which tries to read from a JBosss Queue. I implemented MessageListener
on my class and used the onMessage()
to receive messages
public class JBossConnector implements MessageListener, AutoCloseable {}
Here is my method:
/**
* The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
* If the message is of type MessageObject, then transfer that to Appia
*
* @param message JMS Message
*/
@Override
public void onMessage(Message message) {
// receive the message from jboss queue: 'jbossToAppia'
// then post it to appia
if (message instanceof ObjectMessage) {
try {
MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
System.out.printf("JbossConnector: MessageObject received from JBOSS, %s\n", messageObject.getMessageType());
component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);
} catch (MessageFormatException exception) {
logger.error(ExceptionHandler.getFormattedException(exception));
ExceptionHandler.printException(exception);
} catch (JMSException exception) {
ExceptionHandler.printException(exception);
restart();
}
} else {
System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)\n", this.getClass().getSimpleName());
}
}
Whenever I find a JMSException
I try to restart JBoss connection (Context, Connection, Session, Receiver, Sender). What my doubt is that I've read onMessage()
uses multiple threads to receive messages from queue (correct me if I'm wrong).
When the JBoss queue connection severs, there would be at least some queues that throw this exception. That means they all will try to restart()
the connection which is a waste of time (restart()
first closes all the connections, sets the variables to null and then attempt to initiate connections).
Now I could do something like
synchronized (this){
restart();
}
or use volatile
variables. But that would not guarantee that other threads won't attempt to restart()
when current threads finishes the restart()
operation (again correct me if I'm wrong).
Is there any solution to make this work?
The onMessage()
of a MessageListener
is indeed run from its own thread so you'll need proper concurrency controls. I think the simplest solution would just be to use a java.util.concurrent.atomic.AtomicBoolean
. For example, in your restart()
method you could do something like this:
private void restart() {
AtomicBoolean restarting = new AtomicBoolean(false);
if (!restarting.getAndSet(true)) {
// restart connection, session, etc.
}
}
This will make the restart()
method effectively idempotent. Multiple threads will be able to call restart()
but only the first thread which calls it will actually cause the resources to get re-created. All other calls will return immediately.