Search code examples
c++c++11mqttpaho

MQTT client waits indefinitely during publish of message


I try to implement an asynchronous MQTT client with the paho library, that receives messages on topic "request", formulates a string and puts the response out on topic "response". I use the callbacks to handle the incoming messages.

#include "mqtt/async_client.h"  
#include "mqtt/topic.h"

const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};

class TestCallback : public virtual mqtt::callback
{
    // the mqtt client
    mqtt::async_client& cli_;

    // (re)connection success
    void connected(const std::string& cause) override
    {
        cli_.subscribe("request", 0);
    }

   // callback for when a message arrives.
    void message_arrived(mqtt::const_message_ptr msg) override 
    {
        if( msg->get_topic() == "request" )
        {   
            /* format response message here and put it into (string) msg */

            mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
            pubmsg->set_qos(2);

            //// PROBLEMATIC CODE ////
            cli_.publish(pubmsg)->wait();
            //////////////////////////
        }
    }

public:
    TestCallback(mqtt::async_client& cli)
        : cli_(cli) {}
};



int main(int argc, char** argv)
{    
    mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
    TestCallback cb(cli);
    cli.set_callback(cb);

    mqtt::connect_options connOpts = mqtt::connect_options_builder()
        .clean_session(false)
        .automatic_reconnect()
        .finalize();

    try
    {
        cli.connect(connOpts)->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    // run until the application is shut down
    while (std::tolower(std::cin.get()) != 'q')
            ;

    try
    {
        cli.disconnect()->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    return 0;
}

The problem arises when I try to publish the response message, as the client seems to wait indefinitely. Responsible for this is the wait function which is used on a token to track the status of the published message (reference). To my understanding, this has to be done especially when using higher levels of QoS so ensure everything went well.

Upon removal of the call to wait(), it works as expected. But I am not sure if this ensures the correct publishing of messages.

What is the correct way to do this?


Solution

  • I'm going to make a guess here, because I don't really know how async works in C++.

    The MQTT client has a single message handling thread, this deals with all the incoming and outgoing TCP packets as they arrive/depart on the socket. When a new MQTT message arrives it then calls the message handler callback (message_arrived), in which you call publish and wait for it to complete. But because the call to wait effectively blocks message_arrived the message handling thread can not continue. This means it can not deal with the 3 legged QOS2 handshake required for the publish to complete, hence it hangs.

    I will also guess that if you changed the publish to QOS 0 it would complete, but would also fail with QOS 1 as that requires the message handling thread to send/receive multiple messages to continue.

    Not waiting for the publish to complete is probably the correct solution.