Search code examples
mqtthivemqmqtt.js

.net client keeps Connecting and Disconnecting over and over again


I'm running a Windows Service project in VS2019, with Mqtt 4.1.4 .Net framework client.

My .net client keeps Connecting and Disconnecting over and over again.

I recently discovered that my OnSubscriberDisconnected method is passing the following args values:

args.Reason = SessionTakenOver

args.ReasonString = "Another client connected with the same client id."

Initially I had been creating a new random ClientID upon each HiveMq Broker connection (the free cloud version), But I changed it to :

 clientId = ".netWinSvc-" + this.machineName;

this way the machine which is running my Win Service code will ALWAYS conn with the same ClientID.

I believe I was piling up lots of new ClientID connections, and asking the broker to persist the session (CleanSession = false). And the free cloud subscription allows for 100 device connections.

QUESTION IS: What do I do to clean up all of these clientID connections, and how avoid this Disconnect/Reconnect issue? Is re-using the same ClientID with CleanSession = false the best way to go ? In other words, shouldn't I ask the broker to persist my ClientID connection?

Here is a good portion of my .Net Window Service code:

using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;

namespace MqttNotificationService
  {
    public partial class MqttService : ServiceBase
    {
        public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        private IManagedMqttClient managedMqttClientPublisher;
        private IManagedMqttClient managedMqttClientSubscriber;
        private string mqttClientUser = "";
        private byte[] mqttClientPswd;

        private string mqttBrokerAddress;
        private string mqttProtocol = "wss";
        private int? mqttPort;
        private string defaultMessage;

        private string topicThisHost = "";      // get topic from app.config
        private string heartBeatPubMsg;
        private double heartBeatTimerMs;
        
    public MqttService()
    {
        InitializeComponent();
    }

    protected override void OnStart(string[] args)
    {
        Init();
        CreateThreadAndRun();
        StartHeartBeatTimer();
    }

      private void Init()
      {
        log4net.Config.XmlConfigurator.Configure();
        mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
        mqttBrokerAddress = ConfigurationManager.AppSettings["MqttBrokerAddress"];
        mqttProtocol = ConfigurationManager.AppSettings["MqttProtocol"];
        mqttPort = Int16.Parse(ConfigurationManager.AppSettings["MqttPort"]);
        MqttUseTls = bool.Parse(ConfigurationManager.AppSettings["UseTlsCertificate"]);
        var MqttQos = Int16.Parse(ConfigurationManager.AppSettings["QualityOfService"]);
        mqttRetained = bool.Parse(ConfigurationManager.AppSettings["MqttRetained"]);
        mqttLastWillRetained = bool.Parse(ConfigurationManager.AppSettings["MqttLastWillRetained"]);
        mqttLastWillMessage = ConfigurationManager.AppSettings["MqttLastWillMessage"];
        mqttKeepAliveSeconds = Int16.Parse(ConfigurationManager.AppSettings["MqttLastWillKeepAliveSeconds"]);
        CertificateFileName = ConfigurationManager.AppSettings["CertificateFileName"];
        CertificatePwd = ConfigurationManager.AppSettings["CertificatePswd"];

        defaultMessage = ConfigurationManager.AppSettings["DefaultPubMessage"];
        topicSubFromHar = ConfigurationManager.AppSettings["MqttTopicSubFromHar"];
        topicThisHost = ConfigurationManager.AppSettings["MqttTopicThisHost"];
        heartBeatPubMsg = ConfigurationManager.AppSettings["HeartBeatPubMessage"];
        heartBeatTimerMs = Double.Parse(ConfigurationManager.AppSettings["HeartBeatTimerMs"]);
        pingDicom = bool.Parse(ConfigurationManager.AppSettings["CheckDicomServers"]);
        SynergyHostName = ConfigurationManager.AppSettings["SynergyHostName"];

        machineName = Dns.GetHostName();
        hostIp = Dns.GetHostEntry(machineName)
            .AddressList
            .FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork)
            .ToString();
       
        clientId = ".netWinSvc-" + this.machineName;

        QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
        switch (MqttQos)
        {
            case 0:
                QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
                break;
            case 1:
                QosThisHost = MqttQualityOfServiceLevel.AtMostOnce;
                break;
            case 2:
                QosThisHost = MqttQualityOfServiceLevel.ExactlyOnce;
                break;
        }
      }

      public void CreateThreadAndRun()
      {
          Thread m_Thread = new Thread(new ThreadStart(StartPublisherAndSubscriber));
          m_Thread.SetApartmentState(ApartmentState.STA);
          m_Thread.Name = "MT";
          m_Thread.Priority = ThreadPriority.Highest;
          m_Thread.Start();
      }

      private void StartPublisherAndSubscriber()
      {
          StartSubscriber();
          _ = StartPublisher();
          CheckOtherServers();
      }

      private void StartHeartBeatTimer()
      {
          TimeSpan ts = new TimeSpan(0, 0, 5);
          Thread.Sleep(ts);

          Timer timer = new Timer();
          timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
          timer.Interval = heartBeatTimerMs;
          timer.Enabled = true;
      }

      private void PublishHeartBeat(object source, ElapsedEventArgs e)
      {
          var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
          _ = this.Publish(message, topicThisHost);
          this.CheckOtherServers();
      }

      private async void StartSubscriber()
        {
          applog.Debug($"In StartSubscriber()");

          var mqttFactory = new MQTTnet.MqttFactory();

          managedMqttClientSubscriber = mqttFactory.CreateManagedMqttClient();
          managedMqttClientSubscriber.ConnectedAsync += OnSubscriberConnected;
          managedMqttClientSubscriber.DisconnectedAsync += OnSubscriberDisconnected;
          managedMqttClientSubscriber.ApplicationMessageReceivedAsync += this.OnSubscriberMessageReceived;

          // If tls is enabled in app.config, we use wss with cert file
          if (MqttUseTls)
          {
              var managedClientOptions = WsSecureClientOptions();
              await managedMqttClientSubscriber.StartAsync(managedClientOptions);
          }
          else
          {
              var insecureOptions = WsInsecureOptions();

              await this.managedMqttClientSubscriber.StartAsync(
                      new ManagedMqttClientOptions
                      {
                          ClientOptions = insecureOptions
                      });
          }



          List<MqttTopicFilter> topicFilter = new List<MqttTopicFilter>();
          topicFilter.Add(new MqttTopicFilter { Topic = topicThisHost });
          topicFilter.Add(new MqttTopicFilter { Topic = topicSubFromHar });

          Console.WriteLine("We have subscribed to multiple !");

          await this.managedMqttClientSubscriber.SubscribeAsync(topicFilter);
     }

      public async Task StartPublisher()
      {
          var mqttFactory = new MqttFactory();
          this.managedMqttClientPublisher = mqttFactory.CreateManagedMqttClient();

          // If tls is enabled in app.config, we use wss with cert file
          if (MqttUseTls)
          {
              var managedClientOptions = WsSecureClientOptions();
              managedClientOptions.AutoReconnectDelay = TimeSpan.FromSeconds(10);
              await this.managedMqttClientPublisher.StartAsync(managedClientOptions);
          }
          else
          {
              var insecureOptions = WsInsecureOptions();
              await this.managedMqttClientPublisher.StartAsync(
                  new ManagedMqttClientOptions
                  {
                      ClientOptions = insecureOptions
                  });
          }

          applog.Debug($"In StartPublisher()");

          await Publish($"{defaultMessage}  -  Machine: {this.machineName}, Host: {this.SynergyHostName}", this.topicThisHost);            
      }
        
       public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
      {
          MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
          switch (MqttQos)
          {
              case 0:
                  qos = MqttQualityOfServiceLevel.AtLeastOnce;
                  break;
              case 1:
                  qos = MqttQualityOfServiceLevel.AtMostOnce;
                  break;
              case 2:
                  qos = MqttQualityOfServiceLevel.ExactlyOnce;
                  break;
          }
          MqttModel message = new MqttModel();
          message.message = messageIn;
          message.datestamp = DateTime.Now;
          message.source = "";
          message.status = "";
          var payload = JsonConvert.SerializeObject(message, Formatting.Indented);

          var send = new MqttApplicationMessageBuilder()
              .WithTopic(topic)
              .WithPayload(payload)
              .WithQualityOfServiceLevel(qos)
              .WithRetainFlag(false)
              .Build();

          if (this.managedMqttClientPublisher == null)
          {
              this.managedMqttClientPublisher = pubClient;
          }

          if (this.managedMqttClientPublisher != null)
          {
              try
              {
                  applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
                  await this.managedMqttClientPublisher.EnqueueAsync(send);
                  MonitoringLogs logs = new MonitoringLogs();
                  logs.InsertIntoLog(message);
              }
              catch (Exception ex)
              {
                  string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
                  applog.Error(errorMessage);
                  throw new Exception(errorMessage);
              }
          }
          else
          {
              applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
          }
      }
  
  public ManagedMqttClientOptions WsSecureClientOptions()
        {
            string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);

            // Building out the secure wss url (both pfx/crt certificate file types appear to work here)
            var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";

            X509Certificate2 x509Cert = null;
            var file = CertificateFileName;
            var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);

            // pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
            if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
            {
                // using a PFX cert file via the X509 class
                x509Cert = new X509Certificate2(filePath, CertificatePwd);
            }
            else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
            {
                x509Cert = new X509Certificate2(filePath);
            }

            applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");          

            var clientOptionsBldr = new MqttClientOptionsBuilder()
                                        .WithProtocolVersion(MqttProtocolVersion.V500)
                                        .WithWebSocketServer(url)
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithClientId(clientId)
                                        .WithCleanSession()
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithTls(
                                            new MqttClientOptionsBuilderTlsParameters()
                                            {
                                                UseTls = true,
                                                SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
                                                Certificates = new List<X509Certificate2>() { x509Cert }                                              
                                            });
            ManagedMqttClientOptions managedClientOptions = null;
            try
            {
                applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
                managedClientOptions = new ManagedMqttClientOptionsBuilder()
                                                                .WithClientOptions(clientOptionsBldr)
                                                                .Build();
            }
            catch (Exception ex)
            {
                applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
            }

            return managedClientOptions;
        }

    private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
    {
        return Task.CompletedTask;
    }

    private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
    {
        return Task.CompletedTask;
    }
 }


Solution

  • As per the comments your code was creating two connections to the broker; once each in:

    • StartSubscriber()
    • StartPublisher()

    Both functions end up creating a connection to the broker with the same client id (as they both use the same WsSecureClientOptions()/WsInsecureOptions()).

    MQTT-3.1.4-2 states:

    If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.

    So the two connections will end up "fighting" - one will connect, leading to the other disconnecting, it will try to reconnect etc.

    To solve this either:

    • Use one connection (there is no need to use separate connections for subscribing/publishing)
    • Use different client ID's.