We are using ActiveMQ 5.16.1 with the stompit client to create a durable subscription in our NodeJS app using the following code snippet:
var connectOptions = {
"host": "",
"port": amqPort,
rejectUnauthorized: false,
checkServerIdentity: () => { },
"connectHeaders": {
"heart-beat": "15000,15000",// hear-beat of 15 seconds
'login': 'admin',
'passcode': 'admin',
'client-id' : "agent-manager"
}
};
var server1 = connectOptions;
server1.host = amqPrimaryHost;
var server2 = connectOptions;
server2.host = amqSecondaryHost;
var amqSubscription;
var subscribeHeaders = {
"destination": "/topic/delivery-reports",
"activemq.subscriptionName": "channel_manager_delivery_reports",
"ack": "client-individual"
};
var connectionManager = new stompit.ConnectFailover([server1,server2], reconnectOptions);
connectionManager.connect(function (error, client, reconnect){
if (error) {
logger.error("Terminal error, gave up reconnecting ", error);
return;
}
client.on("error", function (error) {
if(!client)
reconnect();
});
amqSubscription=client.subscribe(subscribeHeaders, function (error, message,subscription) {
logger.info("going to subscribe")
if (error) {
logger.error("Subscription failed. Going to disconnect", error);
subscription.unsubscribe();
// reconnect();
}
logger.info("subscribed")
});
});
function unsubscribe () {
logger.info("Going to unsub")
amqSubscription.unsubscribe({"activemq.subscriptionName":"channel_manager_delivery_reports"})
};
However, when I call the unsubscribe
, it only changes the Subscriber
active status to false but does not remove it from the active subscribers' list as shown in the screenshot.
Getting the following exception in stomp.logs.
2021-05-12 05:20:14,826 [0.1:50251@61613] WARN ProtocolConverter - Exception occurred for client agent-manager (tcp://127.0.0.1:50251) processing: UNSUBSCRIBE -> javax.jms.JMSException: Durable consumer is in use
2021-05-12 05:20:14,826 [0.1:50251@61613] DEBUG ProtocolConverter - Exception detail
javax.jms.JMSException: Durable consumer is in use
at org.apache.activemq.broker.region.TopicRegion.removeSubscription(TopicRegion.java:220)
at org.apache.activemq.broker.region.RegionBroker.removeSubscription(RegionBroker.java:457)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.advisory.AdvisoryBroker.removeSubscription(AdvisoryBroker.java:396)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.TransportConnection.processRemoveSubscription(TransportConnection.java:419)
at org.apache.activemq.command.RemoveSubscriptionInfo.visit(RemoveSubscriptionInfo.java:81)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:179)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompUnsubscribe(ProtocolConverter.java:714)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-05-12 05:20:14,827 [0.1:50251@61613] TRACE ProtocolConverter - Command that caused the error: UNSUBSCRIBE
activemq.subscriptionName:channel_manager_delivery_reports
receipt:1
id:1
Any suggestion on how to remove the durable subscription properly via stompit.
You first need to disconnect the connection which the durable subscriber is using. This deactivates the subscription and will prevent the JMSException: Durable consumer is in use
you're seeing.
Then you need to reconnect using the same client-id
header value which you used on your CONNECT
frame for the connection used to subscribe.
Then need to pass the activemq.subscriptionName
header in the UNSUBSCRIBE
frame just like you did for the SUBSCRIBE
frame, e.g.:
amqSubscription.unsubscribe({"activemq.subscriptionName": "channel_manager_delivery_reports"})
To be clear, this was resolved awhile back via AMQ-1890. You can see the corresponding source code which checks for the header and removes the subscription. You can also see the unit test which subscribes, disconnects, reconnects, and unsubscribes the durable subscription.
If you still have trouble then it may be worth turning on STOMP trace logging in the broker using these instructions to ensure the UNSUBSCRIBE
frame is being received with the expected activemq.subscriptionName
header.