Search code examples
javajmsmessagingqpid

How to handle producer flow control in jms messaging while using apache qpid


I am trying to handle flow control situation on producer end. I have a queue on a qpid-broker with a max queue-size set. Also have flow_stop_count and flow_resume_count set on the queue.

now at the producer keeps on continuously producing messages until this flow_stop_count is reached. Upon breach of this count, an exception is thrown which is handled by Exception listener. Now sometime later the consumer on queue will catch up and the flow_resume_count will be reached. The question is how does the producer know of this event.

Here's a sample code of the producer

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

and for the exception listener

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

Now the exceptionlistener is a generic listener for exceptions, so it should not be a good idea to suspend the producer flow through that.

What I need is perhaps some method on the producer level , something like produer.isFlowStopped() which I can use to check before sending a message. Does such a functionality exist in qpid api.

There is some documentation on the qpid website which suggest this can be done. But I couldn't find any examples of this being done anywhere.

Is there some standard way of handling this kind of scenario.


Solution

  • From what I have read from the Apache QPid documentation it seems that the flow_resume_count and flow_stop_count will cause the producers to start getting blocked.

    Therefore the only option would be to software wise to poll at regular intervals until the messages start flowing again.

    Extract from here.

    If a producer sends to a queue which is overfull, the broker will respond by instructing the client not to send any more messages. The impact of this is that any future attempts to send will block until the broker rescinds the flow control order.

    While blocking the client will periodically log the fact that it is blocked waiting on flow control.

    WARN AMQSession - Broker enforced flow control has been enforced WARN AMQSession - Message send delayed by 5s due to broker enforced flow control WARN AMQSession - Message send delayed by 10s due to broker enforced flow control After a set period the send will timeout and throw a JMSException to the calling code.

    ERROR AMQSession - Message send failed due to timeout waiting on broker enforced flow control.

    From this documentation it implicates that the software managing the producer would then have to self manage. So basically when you receive an exception that the queue is overfull you will need to back off and most likely poll and reattempt to send your messages.