Search code examples
c#ibm-mqxms

Reconnecting to IBM MQ Queue on connection failure


Following code snippet has my connection and subscription logic for an IBM MQ Queue. When ever there is a connection failure, I am using IConnection.ExceptionListener delegate to establish a new connection to by queue and resubscribing for the messages. But the problem is, I can see multiple queue handles. How can I make sure I close the previous connection handle and establish a new connection where ever there is a connection break due to network issues or MQ server restarts?

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;

private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    //Connection
    _connection = _connectionfactory.CreateConnection (null, null);
    _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

    //Session
    _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

    //Destination
    _destination = _session.CreateQueue ("queue://My.Queue.Name");
    _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
    _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

    //Consumer
    _consumer = _session.CreateConsumer (_destination);
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            //Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
            //_conection.Stop()
            //_conection.Close()
            CreateWebsphereQueueConnection ();
            Subscribe (onMessageReceived);
        };

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

Solution

  • IBM.XMS.dll will take care of MQ fail over or restarts done with -r switch. But, if there was a restart wit out asking the connected clients to reconnect, XMS library will not attempt to reconnect and the costumers will have to handle this situation manually as pointed out by @Shashi and @JoshMc.

    I had to handle this situation and changing my Connection ExceptionListener as follows helped me:

    private CancellationToken _cancellationToken;
    private IConnection _connection;
    private IConnectionFactory _connectionfactory;
    private IMessageConsumer _consumer;
    private IDestination _destination;
    private MessageFormat _msgFormat;
    private IMessageProducer _producer;
    private ISession _session;
    private bool _reConnectOnConnectionBreak = false;
    private bool _connected = false;
    private void CreateWebsphereQueueConnection () {
        SetConnectionFactory ();
    
        while (!_connected || _reConnectOnConnectionBreak) {
            try {
                //Connection
                _connection = _connectionfactory.CreateConnection (null, null);
                _connection.ExceptionListener = new ExceptionListener (OnConnectionException);
    
                //Session
                _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
    
                //Destination
                _destination = _session.CreateQueue ("queue://My.Queue.Name");
                _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
                _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
    
                //Consumer
                _consumer = _session.CreateConsumer (_destination);
                _connected = true;
            } catch (Exception ex) {
                _connected = false;
            }
    
        }
    }
    
    private IConnectionFactory SetConnectionFactory () {
        XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
        IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
    
        // Set the properties
        cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
        cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
        cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
        cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
        cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
        cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
        cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
    
        cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
        cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
        return cf;
    }
    
    public override void Subscribe<T> (Action<T> onMessageReceived) {
        try {
    
            _connection.ExceptionListener = delegate (Exception connectionException) {
                XMSException xmsError = (XMSException) connectionException;
                int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
                if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
                    _reConnectOnConnectionBreak = true;
                    _connection.Close ();
    
                    CreateWebsphereQueueConnection ();
                    Subscribe (onMessageReceived);
                    _reConnectOnConnectionBreak = false;
                }
            }
    
            MessageListener messageListener = new MessageListener ((msg) => {
                onMessageReceived (message);
            });
            _consumer.MessageListener = messageListener;
    
            // Start the connection
            _connection.Start ();
        } catch (Exception ex) {
            //Log exception details
        }
    }
    

    There is no better way to check the state of the connection IConnection in IBM MQ version 8. So, I had to use thereason codes. In IBM MQ version 9, we can use the rest API exposed by the server to check the connection state.