Search code examples
c#mqttmosquittoretain

Retained MQTT messages being missed


I am trying to create a simple application that will publish some messages to a topic with MQTT (library I am using is M2Mqtt.net) and then I want to subscribe to the topic once the messages have already been sent and then have them all be received and then discarded, because they have been received.

I am using mosquitto 2.0.12 as the broker

This is the publisher:

public class MessagePublisher : IMessagePublisher
{
    private readonly MqttClient _client;

    public MessagePublisher()
    {
        _client = new MqttClient("localhost");

        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Publish(string topic, string message, bool retain = false)
    {
        Console.Write($"Sent: {topic}, {message}");

        _client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, retain);

        Total.SentAndReceived.Add(message, null);
    }
}

This is the listener:

public class MessageReceiver : IMessageReceiver
{
    private readonly MqttClient _client;

    public MessageReceiver()
    {
        _client = new MqttClient("localhost");
    }

    public void Subscribe(params string[] topics)
    {
        _client.Subscribe(topics, new[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });

        _client.MqttMsgPublishReceived += client_receivedMessage;
    }


    public void Connect()
    {
        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Disconnect()
    {
        _client.Disconnect();
    }

    static void client_receivedMessage(object sender, MqttMsgPublishEventArgs e)
    {
        var message = Encoding.Default.GetString(e.Message);

        Console.WriteLine($"Message Received: {message}");

        if (Total.SentAndReceived.ContainsKey(message))
            Total.SentAndReceived[message] = message;
    }
}

And this is the main application:

public static class Program
{
    public static void Main(string[] args)
    {
        var messageReceiver = new MessageReceiver();

        var publisher = new MessagePublisher();

        for (var i = 1; i <= 10000; i++)
        {
            publisher.Publish("Devices/", i.ToString(), true);
        }

        messageReceiver.Subscribe("Devices/");

        messageReceiver.Connect();

        Thread.Sleep(5000);

        var b = Total.SentAndReceived.Where(x => x.Value == null);

        Console.WriteLine($"{b.Count()} Missed Messages");
    }
}

The problem I am having is that there are missed messages. And the number of missed messages always changes when I run the application. And it's not that last n messages being missed it's the first n messages.

I hope that if I was to build a service that would listen to the published messages. If the services stops for any reason. Once the service comes back online, the messages sent in that downtime would be received.


Solution

  • I think you have a misunderstanding around some terms here.

    First, MQTT does not generally queue messages. The only time the broker will queue messages is if the receiving client has already already been connected and subscribed to the topic at QOS > 0. If that client then disconnects before the publisher sends the messages the broker will queue the messages. They will then only be sent to the receiving client if they then reconnect with the same client id and have the clean session flag set to false. This is the only way that messages will be queued up.

    Since you appear to be using randomly generated client ids (Guid.NewGuid().ToString()) this will not work. You also appear to be trying to subscribe before you connect, again that won't work.

    Secondly, retained messages have nothing to do with message queuing as described above. A message is retained if the retained flag is set at the point of publishing. The broker will then store that specific message and deliver it ever time a client subscribes to the matching topic. This message will be sent before any other messages on the topic. If another message with the retained flag is published it will replace the previous message, there can only be 1 retained message per topic.