Search code examples
c#.netactivemq-classicapache-nms

ActiveMQ - No topic messages received from DurableConsumer


I´m trying to reveive messages from an ActiveMQ topic. In the web console i see that numerous messages are enqueued in the topic, but running the following code doesn´t return anything:

IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
using (IConnection connection = factory.CreateConnection())
{
     connection.Start();
     ISession session = connection.CreateSession();
     ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
     consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
     message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
}

Any hint would be appreciated.


Solution

  • In order for a Durable subscription to receive messages sent while it was offline it must first be registered on the Broker. You register it by creating an instance like you have done in the code given and then once it goes offline, via a call to close() etc messages sent to it's Topic will be stored so that it can read them later. If you have not registered this consumer already then those messages that were sent to the Topic are just dropped.

    You also need a unique client Id for the Connection so that each time you reconnect you can re-subscribe the durable Topic consumer.

    Register the durable topic consumer:

    IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
    using (IConnection connection = factory.CreateConnection())
    {
        connectio.ClientId = "MyClientId";
        connection.Start();
        ISession session = connection.CreateSession();
        ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
        consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
    }
    

    Consume Messages at a later time:

    IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616?wireformat.version=2"));
    using (IConnection connection = factory.CreateConnection())
    {
        connection.ClientId = "MyClientId";
        connection.Start();
        ISession session = connection.CreateSession();
        ActiveMQTopic topic = new ActiveMQTopic("MARKETADAPTERS.ORDERBOOKSNAPSHOT");
        consumer = session.CreateDurableConsumer(topic,"OBSnap",null, false);
        message = (ActiveMQTextMessage)consumer.Receive(TimeSpan.FromSeconds(vTimeOutSecs));
    }