Search code examples
c#mqtt

Losing messages with MQTT with C# uPLibrary.Networking.M2Mqtt


I have a problem that I lose messages with MQTT although I send them with "QOS_LEVEL_EXACTLY_ONCE".

The loss is only when the receiver is not running and then starts later. These messages are then not collected.

Version of M2Mqtt is 4.3.0

If both clients, i.e. receiver and transmitter, are running, no messages are lost.

Only if the receiver is not running, the messages are prefetched during this time and do not arrive at the receiver.

I can't find any setting on the server(broker) for how long messages should be saved

sender

public class Programm
{

    static MqttClient mqttClient;
    static async Task Main(string[] args)
    {
        var locahlost = true;
        var clientName = "Sender 1";

        Console.WriteLine($"{clientName} Startet");

        var servr = locahlost ? "localhost" : "test.mosquitto.org";
        mqttClient = new MqttClient(servr);
        mqttClient.Connect(clientName);

        Task.Run(() =>
        {
            if (mqttClient != null && mqttClient.IsConnected)
            {
                for (int i = 0; i < 100; i++)
                {
                    var Message = $"{clientName} ->Test {i}";
                    mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, true);
                    Console.WriteLine(Message);
                    Thread.Sleep(i * 1000);
                }
            }
        });

        Console.WriteLine($"{clientName} End");
    }
}

Server

public class Programm
{


    static async Task Main(string[] args)
    {
        Console.WriteLine("Server");
        MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
                                     // set endpoint to localhost
                                     .WithDefaultEndpoint()
                                     // port used will be 707
                                     .WithDefaultEndpointPort(1883);
        // handler for new connections



        // creates a new mqtt server     
        IMqttServer mqttServer = new MqttFactory().CreateMqttServer();

        // start the server with options  
        mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();

        // keep application running until user press a key
        Console.ReadLine();
    }
}

Receiver

public class Programm
{
    static MqttClient mqttClient;


    static async Task Main(string[] args)
    {
        var clientName = "Emfänger 1";
        var locahlost = true;

        Console.WriteLine($"Start of {clientName}");
      

        Task.Run(() =>
        {
            var servr = locahlost ? "localhost" : "test.mosquitto.org";
            mqttClient = new MqttClient(servr);
            mqttClient.MqttMsgPublishReceived += MqttClient_MqttMsgPublishReceived;
            mqttClient.Subscribe(new string[] { "Application1/NEW_Message" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
            mqttClient.Connect(clientName);
        });
        
        //  client.UseConnecedHandler(e=> {Console.WriteLine("Verbunden") });
        Console.ReadLine();
        Console.WriteLine($"end of  {clientName}");

        Console.ReadLine();


    }
    private static void MqttClient_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
    {
        var message = Encoding.UTF8.GetString(e.Message);
        Console.WriteLine(message);
    }


}

Solution

  • The default value for the Clean session flag when connecting to the broker with M2MQTT is true.

    This means that the broker will discard any queued messages.

    https://m2mqtt.wordpress.com/using-mqttclient/

    You need to set this to false to ensure the client receives the queued messages.

     mqttClient.Connect(clientName, false);