I have worker threads that pick up messages from different provider classes. Each provider class adds/takes messages of an internal queue. Each provider caters to only one solace queue and the solace consumer adds messages to the provider of the queue.
Multiple workers can take messages of a provider, process them and then send an ack for the message (the message.commit() method below does the ack).
Scenario
QUESTION
Provider class
public abstract class BaseProvider implements IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
if (internalQueue.size() > 0) {
Logger.debug("Queue has entries");
CoreMessage msg = null;
try {
msg = internalQueue.take();
} catch (InterruptedException e) {
Logger.warn("Interruption");
e.printStackTrace();
}
if (msg != null) {
arrMessages.add(msg);
}
}
return arrMessages;
}
protected synchronized void addToQueue(CoreMessage message) {
try {
internalQueue.put(message);
} catch (InterruptedException e) {
Logger.error("Exception adding message to queue " + message);
}
}
}
// There are a set of worker threads that read through these queues
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
This appear to be your custom wrapper around the Solace API. This make it very difficult to provide answers to your question because we simply don't know what is this wrapper doing.
The answers below make the following assumptions.
The wrapper is using a non-transacted JCSMPSession
Client acknowledgement is in use
message.commit()
is actually calling Solace's XMLMessage.ackMessage()
- Would message2 still sit on the solace queue and wait for message1 to be acked or would message2 be popped off the queue despite message1 not acked yet?
Message2 will be removed.
- What happens on the solace hardware when the ack is received? Is the message2 removed completely, how is the queue order then maintained?
Message2 will be acknowledged and removed from the queue.
There's no effect on queue order. Order of messages refer to the order in which incoming messages are delivered to the consuming application. In this case, message1, message2 and message3 was delivered to the consuming application in order.