I use this script to start ActiveMQ locally:
docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq
It starts AMQ version 5.15.6. I connect with AMQ with STOMP over websockets (v1.2). Through web console in my "test" queue I create two messages both with same group "test_grp". I start two processes and each one runs same logic:
activemq.prefetchSize: 1
and ack: client-individual
headersBoth processes receive messages immediately while the second message should be received by the same process or at least it should be received after the first one is ACKed.
Also if I start only one process/subscription with activemq.prefetchSize: 2
header then this process receives two messages immediately instead sequentially after first one is ACKed.
So it seems that JMSXGroupID does not have any effect on how messages are processed. Is it possible that something is not configured correctly on broker side?
I am sure messages are not ACKed automatically because they are still in the queue until consumer ACKs them.
After some testing I figured out that grouping across two consumers works. However still for one consumer with activemq.prefetchSize: 2
it receives two messages from same group immediately. Is this expected behaviour? If yes then it seems that if someone want to process messages in order he must set activemq.prefetchSize
to 1
on subscription?
Here is piece of code to test (Node.js 12.x and requires packages @stomp/stompjs
and websocket
):
Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');
function createClient() {
return new Client({
brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
});
}
function createLog(name) {
return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}
function createConsumer(name) {
const log = createLog(name);
const client = createClient();
client.onConnect = () => {
log('CLIENT_CONNECTED');
client.subscribe('/queue/test', (msg) => {
log('RECEIVED_MESSAGE');
setTimeout(() => {
msg.ack();
log('ACKED_MESSAGE');
}, 10000);
}, {
'activemq.prefetchSize': '2',
ack: 'client-individual',
});
}
client.activate();
}
function publishMessages() {
const log = createLog();
const client = createClient();
client.onConnect = () => {
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 1',
});
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 2',
});
};
client.activate();
}
createConsumer('A');
createConsumer('B');
setTimeout(() => {
publishMessages();
}, 2000);
Output:
22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
Here are headers received by STOMP client (there is no any group info):
headers: {
timestamp: '1580796287376',
persistent: 'true',
'message-id': 'ID:04f803c080ac-46709-1580756713304-3:66:-1:1:2',
priority: '4',
subscription: 'sub-0',
ack: 'ID:04f803c080ac-46709-1580756713304-70:2',
destination: '/queue/test',
expires: '0',
'content-length': '14'
},
This is strange because it seems that JMSXGroupID does not guarantee processing order in group. It only guarantees that messages will be delivered to same consumer in same order but message B can be delivered to consumer before it ACKs message A (or even "starts" processing A) (assuming here that consumer has activemq.prefetchSize > 1
). I don't understand this because since consumer uses ack: client-individual
mode that means that until consumer sends ACK message can not be treated as delivered by broker. So why broker sends message from group for which it does know if previous message from same group was delivered? Maybe it's possible somehow to configure ActiveMQ broker to prevent doing this?
Other solution would be to somehow receive JMSXGroupID header along with a message from broker which does not work (at least not in STOMP) - in such case broker could have local mini-queue(s) to locally order messages from same group in correct order.
My use case is that I want to have a queue that is a input channel for a microservice (/queue/my-service). Other microservices will send messages/events/commands to it. Some of them may have JMSXGroupID, others may not. What I want is to have single subscription with some concurrency (activemq.prefetchSize > 1
). So if 6 messages (A1, A2, A3, B, C, D) are rceived "at once" and three of them are from same group (A1, A3, A3) and other are in different groups or not in any group at all then consumer should process in parallel:
I reported bug (it looks like a bug as @Justin Bertram mentioned) regarding missing JMSXGroupID header in a message here: https://issues.apache.org/jira/browse/AMQ-7395
You are seeing the expected behavior. If you set activemq.prefetchSize
to a value greater than 1 then the broker will dispatch more than 1 message to the client at a time. Since stompjs will invoke the callback function you pass to subscribe
whenever it receives a message then you'll have to control acknowledgement order yourself or simply set activemq.prefetchSize
to 1
.
To address some of your specific points...
...JMSXGroupID does not guarantee processing order in group. It only guarantees that messages will be delivered to same consumer in same order...
That's exactly correct. Grouping messages with the JMSXGroupID
header only guarantees that messages in the same group will be delivered to a particular consumer in the order in which they were received by the broker. Once that is done the acknowledgement order is up to the client itself.
Generally speaking it is assumed that message processing concurrency comes from multiple concurrent consumers rather than a single consumer (or a set of consumers) which processes messages concurrently itself. In this general case delivering grouped messages to a single consumer is enough to guarantee order since it will be processing messages serially.
So why broker sends message from group for which it does know if previous message from same group was delivered?
The broker sends multiple messages from the same group because you've asked it to by setting activemq.prefetchSize
to a value greater than 1.