I'm working with two MQTT client libs: One for Angular 14 (in my browser application), and the other is Mqtt 4.1.4 .Net client for Framework 4.6.
At one point I was publishing an MQTT test message from my front-end application with retain: true
. However, in my .Net logs I kept getting those messages over and over again - HUNDREDS OF THEM.
And now that I turned OFF the retain flag in my front end app, of course now those older retained messages keep flooding my logs.
Two questions :
Why am I getting so many duplicates when Subscribing to topic myapp/from-har-app
.
How can I delete the RETAINED message? I read on HiveMq docs that I can just publish an empty payload on that same topic..
string message = "";
this.Publish(message, "harmony/from-my-app"); // TRY TO DELETE RETAINED MSG !
Here's the OnSubscribe
method in .Net 4.6 :
private Task OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var item = $"{ x.ApplicationMessage.ConvertPayloadToString()}";
applog.Info($"Mqtt OnSubscriberMessageReceived() method - message/topic: {item} {x.ApplicationMessage.Topic}");
return Task.CompletedTask;
}
using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;
namespace MqttNotificationService
{
public partial class MqttService : ServiceBase
{
public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private IManagedMqttClient managedMqttClientPublisher;
private IManagedMqttClient managedMqttClientSubscriber;
private string mqttClientUser = "";
private byte[] mqttClientPswd;
private string mqttBrokerAddress;
private string mqttProtocol = "wss";
private int? mqttPort;
private string defaultMessage;
private string topicThisHost = ""; // get topic from app.config
private string heartBeatPubMsg;
private double heartBeatTimerMs;
public MqttService()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
Init();
CreateThreadAndRun();
StartHeartBeatTimer();
}
private void Init()
{
log4net.Config.XmlConfigurator.Configure();
mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
// pulling additional keys here...
}
private void StartPublisherAndSubscriber()
{
StartSubscriber();
_ = StartPublisher();
CheckOtherServers();
}
private void StartHeartBeatTimer()
{
TimeSpan ts = new TimeSpan(0, 0, 5);
Thread.Sleep(ts);
Timer timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
timer.Interval = heartBeatTimerMs;
timer.Enabled = true;
}
private void PublishHeartBeat(object source, ElapsedEventArgs e)
{
var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
_ = this.Publish(message, topicThisHost);
this.CheckOtherServers();
}
public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
{
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
qos = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
qos = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
qos = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
MqttModel message = new MqttModel();
message.message = messageIn;
message.datestamp = DateTime.Now;
message.source = "";
message.status = "";
var payload = JsonConvert.SerializeObject(message, Formatting.Indented);
var send = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();
if (this.managedMqttClientPublisher == null)
{
this.managedMqttClientPublisher = pubClient;
}
if (this.managedMqttClientPublisher != null)
{
try
{
applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
await this.managedMqttClientPublisher.EnqueueAsync(send);
MonitoringLogs logs = new MonitoringLogs();
logs.InsertIntoLog(message);
}
catch (Exception ex)
{
string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
applog.Error(errorMessage);
throw new Exception(errorMessage);
}
}
else
{
applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
}
}
public ManagedMqttClientOptions WsSecureClientOptions()
{
string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);
// Building out the secure wss url (both pfx/crt certificate file types appear to work here)
var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";
X509Certificate2 x509Cert = null;
var file = CertificateFileName;
var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);
// pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
{
// using a PFX cert file via the X509 class
x509Cert = new X509Certificate2(filePath, CertificatePwd);
}
else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
{
x509Cert = new X509Certificate2(filePath);
}
applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");
var clientOptionsBldr = new MqttClientOptionsBuilder()
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithWebSocketServer(url)
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithClientId(clientId)
.WithCleanSession()
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithTls(
new MqttClientOptionsBuilderTlsParameters()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate2>() { x509Cert }
});
ManagedMqttClientOptions managedClientOptions = null;
try
{
applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptionsBldr)
.Build();
}
catch (Exception ex)
{
applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
}
return managedClientOptions;
}
private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
{
return Task.CompletedTask;
}
private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
{
return Task.CompletedTask;
}
}
Here's my log just now, after debugging my .Net project locally - the older retained message from 11:19:37 is still hitting my logs. The newest test msg
I sent is the 3rd from bottom at 11:49
:
2023-03-24 11:48:24 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:48:57 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:48:32 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:04 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:06 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:09 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:16 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:17 INFO OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:18 ( DEBUG Mqtt Publish() method - I am alive, from MqttWinSvc: : ... @2023-03-24 11:49:18 AM /client/internal/admin/host/MyHostName
2023-03-24 11:49:19 INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:26 INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:27 INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
*** (NEW MSG) 2023-03-24 11:49:33 (null) [20] INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:49:32 AM myapp/from-har-app ***
2023-03-24 11:49:40 (null) [28] INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
2023-03-24 11:49:46 (null) [30] INFO Mqtt Service OnSubscriberMessageReceived() method - message/topic: Hello from browser - 11:19:37 AM myapp/from-har-app
It just seems that I'm getting too many duplicates message in both front-end and .Net clients. I'm also tweaking the QoS value btwn 0/1/2 to see what happens.
Thanks for the advice...
To clear a retained message from a topic you must do the following:
null
(empty) payloadYou can do this explicitly with the latest version of the mosquitto_sub
tool, e.g.
mosquitto_sub -t '#' --remove-retained --retained-only
This will connect to a broker on localhost and remove ALL the retained messages.