Search code examples
javamqttpahohivemq

MQTT5 message is lost on HiveMQ Cloud when correlationData is added


In a Java server application, we want to use the correlationData, which is part of MQTT5, so we can use this in the reply message to link and validate it when the reply is received.

I'm using the hivemq-mqtt-client library 1.2.2 and connecting to HiveMQ Cloud.

The connection is made like this:

private Mqtt5AsyncClient client;

public MqttConfig(Environment environment) {
    client = MqttClient.builder()
            .useMqttVersion5()
            .identifier("TestServer")
            .serverHost(MQTT_SERVER)
            .serverPort(MQTT_PORT)
            .sslWithDefaultConfig()
            .automaticReconnectWithDefaultConfig()
            .buildAsync();

    client.connectWith()
            .simpleAuth()
            .username(MQTT_USER)
            .password(MQTT_PASSWORD.getBytes())
            .applySimpleAuth()
            .send()
            .whenComplete((connAck, throwable) -> {
                if (throwable != null) {
                    logger.error("Could not connect to MQTT: {}", throwable.getMessage());
                } else {
                    logger.info("Connected to MQTT: {}", connAck.getReasonCode());
                }
            });
}

public Mqtt5AsyncClient getClient() {
    return client;
}

Sending a message is done with this method:

mqttConfig.getClient()
            .publishWith()
            .topic(destinationTopic)
            //.correlationData(correlationData.getBytes())
            .responseTopic(responseTopic)
            .payload(message.getBytes())
            .qos(MqttQos.AT_LEAST_ONCE)
            .send()
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    logger.info("Error sending to '{}': {} - {}", destinationTopic, message, throwable.getMessage());
                } else {
                    logger.info("Message sent to '{}': {} - {}", destinationTopic, message, result);
                }
            });

When monitoring the messages on http://www.hivemq.com/demos/websocket-client/ and on a subscriber, messages are only received when the line with `correlationData()' is commented, as you can see above.

Both with or without that line, the application logs a successful sending, e.g. with correlation data enabled:

Message sent to 'server/test': testMessage - MqttQos2Result{publish=MqttPublish{topic=server/test, payload=11byte, qos=EXACTLY_ONCE, retain=false, responseTopic=server/test/reply, correlationData=6byte}, pubRec=MqttPubRec{packetIdentifier=1}}

Any idea why the additional correlationData seems to cause that they are not shown on the websocket test page, and are not received on any of the subscribers?

As an experiment, I used the paho 5 library instead of the HiveMQ library with the following code, but had exactly the same behavior and needed to disable the line to see messages passing:

    MqttProperties properties = new MqttProperties();
    //properties.setCorrelationData(correlationData.getBytes());
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setQos(1);
    mqttMessage.setPayload(message.getBytes());
    mqttMessage.setProperties(properties);
    try {
        mqttConfig.getClient().publish(destinationTopic, mqttMessage);
        logger.info("Message was sent to '{}': {}", destinationTopic, message);
    } catch (MqttException ex) {
        logger.error("Error sending to '{}': {} - {}", destinationTopic, message, ex.getMessage());
    }

Solution

  • This behaviour has now been fixed by HiveMQ Cloud.