Search code examples
activemq-classicstompstomp.py

How to force disconnect from ActiveMQ connection with Stomp.py


When listening to a message queue using a durable connection I get an error in the listener. I simulate this by hitting CTRL-Z to quit the program. Trying to re-connect gives me a an error that says:

on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:197)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:70)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:745)
"

I have tried to unsubscribe and disconnect my connection using the method below but it won't disconnect me.

class MyListener(stomp.ConnectionListener):
    """This is a listener class that listens for new messages using the STOMP protocol"""

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination="/topic/bmrsTopic",
                id=self.client_id
            )
        except:
            print('unsubscribe failed')
        # first disconnect before trying to reconnect
        print('first disconnect before trying to reconnect')
        try:
            self.conn.disconnect()
        except:
            print('disconnect failed')

How can I force the AMQ server to forget my previous connection?


Solution

  • You cannot for the broker to disconnect other client resources from another connection. Instead you should be configuring the connection with an idle timeout value for both the client and the broker so that both sides handle the remote dropping without the socket detecting the close.

    You can also configure the broker transport connector with Heart-Beat Grace Period values for stomp clients that don't advertise heart beats, refer to the ActiveMQ broker STOMP documentation.

    The STOMP specification outlines how heart beats work: