Tested with paho version 1.2.5
In this example, I am sending a message to the destination root/msg/1/data
. I have two subscriptions: root/msg/1/#
and root/msg/+/#
. Both subscriptions match the message sent to root/msg/1/data
. I expected the message listener to be called once for each subscription. However, to my surprise, the message listener was called twice for each subscription.
public class MqttPahoExample {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(false)
.requestResponseInfo(true)
.build();
mqttConnectionOptions.setSendReasonMessages(false);
MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
mqttClient.connect(mqttConnectionOptions);
// Two subscriptions, both will match the incoming data
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener = (topic, message1) ->
System.out.println("topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
Expected output
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
Actual output
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
I was expecting to receive 1 message for each subscription.
EDIT: I was using paho mqtt5 but as suggested by @Brits in below answer I tried same example with Subscription identifier. But I was still getting extra copy of message for each overlapping subscription. It seems like, it mostly depends on broker implementation that how they interpreted this sentence from specification.
Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.
So I tried different mqtt brokers
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:1
root/msg/1/# topic:root/msg/1/data message:TestMessage id:2
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
code using subscription Identifier
public class MqttPahoFinalAsync {
public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(true)
.requestReponseInfo(true)
.build();
mqttConnectionOptions.setUseSubscriptionIdentifiers(true);
MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
mqttClient.connect(mqttConnectionOptions).waitForCompletion();
// 2 subscriptions, both will match the incoming data
// subscription 1
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties1.setSubscriptionIdentifier(1);
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
System.out.println("root/msg/1/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();
// subscription 2
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties2.setSubscriptionIdentifier(2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
System.out.println("root/msg/+/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());
mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();
// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
The MQTT V3 spec allows the server to send the message once, or multiple times:
When Clients make subscriptions with Topic Filters that include wildcards, it is possible for a Client’s subscriptions to overlap so that a published message might match multiple filters. In this case the Server MUST deliver the message to the Client respecting the maximum QoS of all the matching subscriptions [MQTT-3.3.4-2]. In addition, the Server MAY deliver further copies of the message, one for each additional matching subscription and respecting the subscription’s QoS in each case.
(V5 is similar but includes the concept of a "Subscription Identifier" which I cover below).
In your case you are subscribing to two overlapping topics, when the message is received the server is sending the PUBLISH to your client twice (using messages ID's are 1 and 2 in this case). If you used a different server then you might see a different result (as this is something the spec allows, but does not require).
Each message will be sent to each listener, so you get four lines output.
If you want to avoid this then consider using MQTT V5 with subscription identifiers. This would enable your message handlers to ignore any messages not relating to the specific SUBSCRIBE request.