Search code examples
node.jsactivemq-classicstomp

How to remove durable subscription using stompit


We are using ActiveMQ 5.16.1 with the stompit client to create a durable subscription in our NodeJS app using the following code snippet:

  var connectOptions = {
    "host": "",
    "port": amqPort,
    rejectUnauthorized: false,
    checkServerIdentity: () => { },
    "connectHeaders": {
        "heart-beat": "15000,15000",// hear-beat of 15 seconds
        'login': 'admin',
         'passcode': 'admin',
         'client-id' : "agent-manager"
    }
  };

  var server1 =  connectOptions;
  server1.host = amqPrimaryHost;
  var server2 = connectOptions;
  server2.host = amqSecondaryHost;
   
   var amqSubscription;
     var subscribeHeaders = {
            "destination": "/topic/delivery-reports",          
            "activemq.subscriptionName": "channel_manager_delivery_reports",
            "ack": "client-individual"                          
    };
    var connectionManager = new stompit.ConnectFailover([server1,server2], reconnectOptions);
       connectionManager.connect(function (error, client, reconnect){
        if (error) {
            logger.error("Terminal error, gave up reconnecting ", error);
            return;
        }
        client.on("error", function (error) {
            if(!client)
              reconnect();
        });
    
   
        amqSubscription=client.subscribe(subscribeHeaders, function (error, message,subscription) {
            logger.info("going to subscribe")
            if (error) {
                logger.error("Subscription failed. Going to disconnect", error);
                subscription.unsubscribe();
               // reconnect();
            
            }
            logger.info("subscribed")  
        });
    
    });

   function unsubscribe () {
        
        logger.info("Going to unsub")
        amqSubscription.unsubscribe({"activemq.subscriptionName":"channel_manager_delivery_reports"})

   };

However, when I call the unsubscribe, it only changes the Subscriber active status to false but does not remove it from the active subscribers' list as shown in the screenshot.enter image description here

Getting the following exception in stomp.logs.

2021-05-12 05:20:14,826 [0.1:50251@61613] WARN  ProtocolConverter              - Exception occurred for client agent-manager (tcp://127.0.0.1:50251) processing: UNSUBSCRIBE -> javax.jms.JMSException: Durable consumer is in use
2021-05-12 05:20:14,826 [0.1:50251@61613] DEBUG ProtocolConverter              - Exception detail
javax.jms.JMSException: Durable consumer is in use
    at org.apache.activemq.broker.region.TopicRegion.removeSubscription(TopicRegion.java:220)
    at org.apache.activemq.broker.region.RegionBroker.removeSubscription(RegionBroker.java:457)
    at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
    at org.apache.activemq.advisory.AdvisoryBroker.removeSubscription(AdvisoryBroker.java:396)
    at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
    at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
    at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
    at org.apache.activemq.broker.TransportConnection.processRemoveSubscription(TransportConnection.java:419)
    at org.apache.activemq.command.RemoveSubscriptionInfo.visit(RemoveSubscriptionInfo.java:81)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
    at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
    at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:179)
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompUnsubscribe(ProtocolConverter.java:714)
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)
    at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)
2021-05-12 05:20:14,827 [0.1:50251@61613] TRACE ProtocolConverter              - Command that caused the error: UNSUBSCRIBE
activemq.subscriptionName:channel_manager_delivery_reports
receipt:1
id:1

Any suggestion on how to remove the durable subscription properly via stompit.


Solution

  • You first need to disconnect the connection which the durable subscriber is using. This deactivates the subscription and will prevent the JMSException: Durable consumer is in use you're seeing.

    Then you need to reconnect using the same client-id header value which you used on your CONNECT frame for the connection used to subscribe.

    Then need to pass the activemq.subscriptionName header in the UNSUBSCRIBE frame just like you did for the SUBSCRIBE frame, e.g.:

    amqSubscription.unsubscribe({"activemq.subscriptionName": "channel_manager_delivery_reports"})
    

    To be clear, this was resolved awhile back via AMQ-1890. You can see the corresponding source code which checks for the header and removes the subscription. You can also see the unit test which subscribes, disconnects, reconnects, and unsubscribes the durable subscription.

    If you still have trouble then it may be worth turning on STOMP trace logging in the broker using these instructions to ensure the UNSUBSCRIBE frame is being received with the expected activemq.subscriptionName header.