Search code examples
mqtthivemqmqttnetmqtt.js

Retained messages in the MQTT Message Protocol


I'm working with two MQTT client libs: One for Angular 14 (in my browser application), and the other is Mqtt 4.1.4 .Net client for Framework 4.6.

At one point I was publishing an MQTT test message from my front-end application with retain: true. However, in my .Net logs I kept getting those messages over and over again - HUNDREDS OF THEM.

And now that I turned OFF the retain flag in my front end app, of course now those older retained messages keep flooding my logs.

Two questions :

  1. Why am I getting so many duplicates when Subscribing to topic myapp/from-har-app.

  2. How can I delete the RETAINED message? I read on HiveMq docs that I can just publish an empty payload on that same topic..

    string message = "";
    this.Publish(message, "harmony/from-my-app"); // TRY TO DELETE RETAINED MSG !
    

Here's the OnSubscribe method in .Net 4.6 :

private Task OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
  {
      var item = $"{ x.ApplicationMessage.ConvertPayloadToString()}";
      applog.Info($"Mqtt OnSubscriberMessageReceived() method - message/topic: {item} {x.ApplicationMessage.Topic}");
      return Task.CompletedTask;
  }

using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;

namespace MqttNotificationService
  {
    public partial class MqttService : ServiceBase
    {
        public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        private IManagedMqttClient managedMqttClientPublisher;
        private IManagedMqttClient managedMqttClientSubscriber;
        private string mqttClientUser = "";
        private byte[] mqttClientPswd;

        private string mqttBrokerAddress;
        private string mqttProtocol = "wss";
        private int? mqttPort;
        private string defaultMessage;

        private string topicThisHost = "";      // get topic from app.config
        private string heartBeatPubMsg;
        private double heartBeatTimerMs;
        
    public MqttService()
    {
        InitializeComponent();
    }

    protected override void OnStart(string[] args)
    {
        Init();
        CreateThreadAndRun();
        StartHeartBeatTimer();
    }

      private void Init()
      {
        log4net.Config.XmlConfigurator.Configure();
        mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
        // pulling additional keys here...
      }

      private void StartPublisherAndSubscriber()
      {
          StartSubscriber();
          _ = StartPublisher();
          CheckOtherServers();
      }

      private void StartHeartBeatTimer()
      {
          TimeSpan ts = new TimeSpan(0, 0, 5);
          Thread.Sleep(ts);

          Timer timer = new Timer();
          timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
          timer.Interval = heartBeatTimerMs;
          timer.Enabled = true;
      }

      private void PublishHeartBeat(object source, ElapsedEventArgs e)
      {
          var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
          _ = this.Publish(message, topicThisHost);
          this.CheckOtherServers();
      }

       public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
      {
          MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
          switch (MqttQos)
          {
              case 0:
                  qos = MqttQualityOfServiceLevel.AtLeastOnce;
                  break;
              case 1:
                  qos = MqttQualityOfServiceLevel.AtMostOnce;
                  break;
              case 2:
                  qos = MqttQualityOfServiceLevel.ExactlyOnce;
                  break;
          }
          MqttModel message = new MqttModel();
          message.message = messageIn;
          message.datestamp = DateTime.Now;
          message.source = "";
          message.status = "";
          var payload = JsonConvert.SerializeObject(message, Formatting.Indented);

          var send = new MqttApplicationMessageBuilder()
              .WithTopic(topic)
              .WithPayload(payload)
              .WithQualityOfServiceLevel(qos)
              .WithRetainFlag(false)
              .Build();

          if (this.managedMqttClientPublisher == null)
          {
              this.managedMqttClientPublisher = pubClient;
          }

          if (this.managedMqttClientPublisher != null)
          {
              try
              {
                  applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
                  await this.managedMqttClientPublisher.EnqueueAsync(send);
                  MonitoringLogs logs = new MonitoringLogs();
                  logs.InsertIntoLog(message);
              }
              catch (Exception ex)
              {
                  string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
                  applog.Error(errorMessage);
                  throw new Exception(errorMessage);
              }
          }
          else
          {
              applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
          }
      }
  
  public ManagedMqttClientOptions WsSecureClientOptions()
        {
            string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);

            // Building out the secure wss url (both pfx/crt certificate file types appear to work here)
            var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";

            X509Certificate2 x509Cert = null;
            var file = CertificateFileName;
            var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);

            // pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
            if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
            {
                // using a PFX cert file via the X509 class
                x509Cert = new X509Certificate2(filePath, CertificatePwd);
            }
            else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
            {
                x509Cert = new X509Certificate2(filePath);
            }

            applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");          

            var clientOptionsBldr = new MqttClientOptionsBuilder()
                                        .WithProtocolVersion(MqttProtocolVersion.V500)
                                        .WithWebSocketServer(url)
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithClientId(clientId)
                                        .WithCleanSession()
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithTls(
                                            new MqttClientOptionsBuilderTlsParameters()
                                            {
                                                UseTls = true,
                                                SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
                                                Certificates = new List<X509Certificate2>() { x509Cert }                                              
                                            });
            ManagedMqttClientOptions managedClientOptions = null;
            try
            {
                applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
                managedClientOptions = new ManagedMqttClientOptionsBuilder()
                                                                .WithClientOptions(clientOptionsBldr)
                                                                .Build();
            }
            catch (Exception ex)
            {
                applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
            }

            return managedClientOptions;
        }

    private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
    {
        return Task.CompletedTask;
    }

    private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
    {
        return Task.CompletedTask;
    }
 }

Here's my log just now, after debugging my .Net project locally - the older retained message from 11:19:37 is still hitting my logs. The newest test msg I sent is the 3rd from bottom at 11:49:

2023-03-24 11:48:24   INFO  OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:48:57   INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:48:32   INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:04 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:06  INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:09   INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:16   INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:17  INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:18 ( DEBUG Mqtt Publish() method - I am alive, from  MqttWinSvc: : ... @2023-03-24 11:49:18 AM /client/internal/admin/host/MyHostName 


2023-03-24 11:49:19  INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:26   INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:27  INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app

*** (NEW MSG) 2023-03-24 11:49:33 (null) [20]  INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:49:32 AM myapp/from-har-app ***

2023-03-24 11:49:40 (null) [28]  INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:46 (null) [30]  INFO  Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app

It just seems that I'm getting too many duplicates message in both front-end and .Net clients. I'm also tweaking the QoS value btwn 0/1/2 to see what happens.

Thanks for the advice...


Solution

  • To clear a retained message from a topic you must do the following:

    • Publish a message with a null (empty) payload
    • It needs to be on the same topic
    • It must also have the retained flag set

    You can do this explicitly with the latest version of the mosquitto_sub tool, e.g.

    mosquitto_sub -t '#' --remove-retained --retained-only
    

    This will connect to a broker on localhost and remove ALL the retained messages.