Search code examples
javaeclipsemqttmosquittopaho

publisher publishing with qos 2 get the acknowledge from broker or subcriber


i am little confuse about qos, i read about qos is if qos set to 2 then The broker/client will deliver the message exactly once by using a four step handshake.

so qos 2 confirms that message is published on broker, not received by subscriber(client). or message is received by subscriber or

for acknowledgment we should need to establish application like publisher will publish the message with a topic for example "DATA" and will subscribe on a topic for example "ACK" and subscriber need to publish acknowledgment on topic "ACK" that message is received on topic "DATA"

i created a java class for publishing data and another class for subscribing publisher

in the following code i tried to publish at qos 2 and in deliveryComplete function i got exception when try to getMessage() when i tried with qos 0 getMessage() didn't give any exception.

public class PublishMe implements MqttCallback{
    MqttClient myClient;
    MqttClient myClientPublish;
    MqttConnectOptions connOpt;
    MqttConnectOptions connOptPublish;
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883";

    static Boolean msgACK=false;    
    public static void main(String[] args) {
        PublishMe smc = new PublishMe();
        smc.runClient();
    }
    @Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!");
    }

        @Override
        public void messageArrived(String string, MqttMessage message) throws Exception {
                System.out.println("-------------------------------------------------");
        System.out.println("| Topic:" + string);
        System.out.println("| Message: " + new String(message.getPayload()));
        System.out.println("-------------------------------------------------");

        }
/**
     * 
     * deliveryComplete
     * This callback is invoked when a message published by this client
     * is successfully received by the broker.
     * 
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
       try{
        System.out.println("Message delivered successfully to topic : \"" + token.getMessage().toString() + "\".");
       }catch(Exception ex){
        System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- " );      
               }

       }

    public void runClient() {
        connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(false);
        connOpt.setKeepAliveInterval(0);

                connOptPublish= new MqttConnectOptions();
        connOptPublish.setCleanSession(false);
        connOptPublish.setKeepAliveInterval(0);

// Connect to Broker
        try {
            myClient = new MqttClient(BROKER_URL, "pahomqttpublish11");
            myClient.setCallback(this);
            myClient.connect(connOpt);

                        myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42");
            myClientPublish.setCallback(this);
            myClientPublish.connect(connOptPublish);

        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        System.out.println("Connected to " + BROKER_URL);

        String myTopic = "sample";
//                String myTopic = "receiveDATA2";
                MqttTopic topic = myClientPublish.getTopic(myTopic);

        // publish messages if publisher
        if (publisher) {

                    int i=1;
            while(true){
                                String pubMsg = "sample msg "+i;

                MqttMessage message = new MqttMessage(pubMsg.getBytes());
                                System.out.println(message);
                                message.setQos(2);
                                message.setRetained(false);

                                // Publish the message
                                MqttDeliveryToken token = null;
                                try {
                    // publish message to broker
                    token = topic.publish(message);
                    // Wait until the message has been delivered to the broker
                    token.waitForCompletion();
                                        msgACK=false;
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }           
        }
    }


}

and below is subscriber

public class Mqttsample implements MqttCallback{
    MqttClient myClient;
    MqttClient myClientPublish;
    MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
    // the following two flags control whether this example is a publisher, a subscriber or both
    static final Boolean subscriber = true;
    static final Boolean publisher = true;
        public static void main(String[] args) {


        Mqttsample smc = new Mqttsample();
        smc.runClient();
    }
       @Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!");
        // code to reconnect to the broker would go here if desired
    }

        @Override
        public void messageArrived(String string, MqttMessage message) throws Exception {
        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload()));

        }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try{
            System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
        }
        catch(Exception ex ){
            System.out.println("delivery Error "+ex.getMessage());
        }
       }



    public void runClient() {
        connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(false);
        connOpt.setKeepAliveInterval(0);

                connOptPublish= new MqttConnectOptions();
        connOptPublish.setCleanSession(false);
        connOptPublish.setKeepAliveInterval(0);

// Connect to Broker
        try {
            myClient = new MqttClient(BROKER_URL, "pahomqttpublish");
            myClient.setCallback(this);
            myClient.connect(connOpt);

                        myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe");
            myClientPublish.setCallback(this);
            myClientPublish.connect(connOptPublish);

        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        System.out.println("Connected to " + BROKER_URL);


        // subscribe to topic if subscriber
        if (subscriber) {
            try {
                            //String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING;
                            String myTopicACK = "sample";
                           // MqttTopic topicACK = myClient.getTopic(myTopicACK);
                int subQoS = 2;

                myClient.subscribe(myTopicACK, subQoS);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
//                 


    }


}

how can i assure that subscriber has received the message, what should i need to implement in publisher code.

http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html from the above link

QoS2, Exactly once: The message is always delivered exactly once. The message must be stored locally at the sender, until the sender receives confirmation that the message has been published by the receiver. The message is stored in case the message must be sent again. QoS2 is the safest, but slowest mode of transfer.


Solution

  • As you have determined, the higher QOS levels describe delivery of messages between a client (publisher or subscriber) and the broker only, not end to end publisher to subscriber.

    This is very deliberate because as a pub/sub protocol there is no way to know how many subscribers there may be to a topic. There could be any number between 0 and n. Also publishers and subscribers can interact with a topic at different QOS levels (Publisher can publish at QOS 2 and a subscriber can subscribe at QOS 0). Messages can also be published as retained messages, such that the last retained message will always be delivered to a newly subscribing client.

    All storage on the clients to meet the QOS contract should be handled by the MQTT library you are using (in this case Paho)

    The deliveryComplete callback is only an indication that the publisher has finished sending the message to the broker. Also the doc says that the token.getMessage() will return null if the message has been delivered which would explain the exception you mentioned (I have to guess here since you didn't include the exception).

    If your application architecture really requires that there is end to end acknowledgement of messages then you will need to implement something similar to what you have described. But to make sure it works properly you should include a message id in the payload of your message and the acknowledgement message should include this and probably some way of identifying which subscriber is replying to ensure you know who received the message. The only reason I would use something like this is if there is a time requirement for acknowledging the message. If time is not relevant factor then look at using Persistent Sessions to ensure that messages are delivered to a subscribing client when they reconnect if they are disconnected at the time of publishing.