Search code examples
c#multithreadingasync-awaitthread-safetymqttnet

How should an MQTTnet client's lifecycle be managed?


tl;dr: How can I avoid disposing the MQTTnet client while it is in use on another thread? Perhaps this pertains to any IDisposable, but in the case of ManagedMqttClient, there are also calls like IsConnected to worry about before async calls.

Note: We are on MQTTnet v3.0.16. I'm open to answers that include "upgrade to latest, then use approach X"

I inherited an application which uses the ManagedMqttClient and originally replaced/disposed that client when the user made changes to broker settings:

using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;

internal class OriginalApproach
{
    private IManagedMqttClient _mqttClient;
    private static MqttFactory _factory;

    public OriginalApproach()
    {
        _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
    }

    //Called if the user changes settings that affect the way we connect
    //to the broker.
    public async void OnSettingsChange()
    {
        if (_mqttClient != null && _mqttClient.IsConnected)
        {
            StopAsync();
            return;
        }

        //Disposal isn't the only thread safety issue
        if (_mqttClient != null && _mqttClient.IsStarted)
        {
            await Reconnect(TimeSpan.FromSeconds(2));
        }
    }

    public async void StopAsync()
    {
        if (_mqttClient != null)
        {
            await _mqttClient.StopAsync();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
    }

    public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
    {
        await Reconnect(TimeSpan.FromSeconds(5));
    }

    public async Task Reconnect(TimeSpan delay)
    {
        StopAsync();
        await Task.Delay(delay);
        Connect();
    }

    public async void Connect()
    {
        await CreateManagedClient();

        try
        {
            if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
            {
                StartAsync();
            }
        }
        catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
        catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
    }

    public async Task<bool> CreateManagedClient()
    {
        try
        {
            if (_mqttClient != null)
                _mqttClient.Dispose();

            _factory = new MqttFactory();
            _mqttClient = _factory.CreateManagedMqttClient();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
        catch (Exception e)
        {
            _mqttClient.Dispose();
            _mqttClient = null;
            return false;
        }
        return true;
    }

    public async void StartAsync()
    {
        MqttApplicationMessage mess = new MqttApplicationMessage();

        mess.Payload = BuildDeathCertificate();
        mess.Topic = "...";

        MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();

        IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
                .WithClientId("ABCD")
                .WithCleanSession(true)
                .WithWillMessage(mess)
                .WithKeepAlivePeriod(new System.TimeSpan(1234))
                .WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
                .Build();

        var managedClientOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(options)
            .Build();

        if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
        {
            try
            {
                await _mqttClient.StartAsync(managedClientOptions);
            }
            catch (Exception e) { /* ... */  }
        }
    }

    byte[] BuildDeathCertificate()
    {
        return new byte[1234];
    }

    public async void PublishMessage(byte[] payloadBytes)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic("...")
            .WithPayload(payloadBytes)
            .WithExactlyOnceQoS()
            .WithRetainFlag(false)
            .Build();

        try
        {
            await _mqttClient.PublishAsync(message);
        }
        catch (NullReferenceException e) { /* ... */  }
    }
}

Obviously, there are numerous thread-safety issues here, and various situations have yielded ObjectDisposed exceptions.

I played with using a single ManagedMqttClient for the lifetime of the application:

internal class SingleClientTest
{
    private IManagedMqttClient _mqttClient;
    public SingleClientTest()
    {
        var factory = new MqttFactory();

        //Used for lifetime of application
        _mqttClient = factory.CreateManagedMqttClient();
    }

    public async void Connect()
    {
        //No longer calling CreateManagedClient() here

        try
        {
            if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
            {
                StartAsync();
            }
        }
        catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
        catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
    }

    //The other methods are mostly unchanged
}

Overall it solves the ObjectDisposed issue, but it doesn't address thread-safety of calling IsConnected before the async calls. And, given that MqttFactory exists, reusing one client feels like a hack. Also, I've run into one use case that acts a bit like this issue. Specifically, StartAsync() yielded the exception "The managed client is already started" despite IsStarted being false. I can provide more detail if desired, but for now I'll avoid muddying the question.

I also explored adding locks around calls to the client, but they cannot be used around awaited calls because of deadlock risk.

Finally, I've read through the MQTTnet samples, wiki, a few of the issues, and poked around the code a bit. So far, I haven't found additional concurrency mechanisms in the library.

I'm exploring a few options (perhaps a combination of these):

  1. Using SemaphorSlim around all calls to the client, as described here - It looks like it may work around awaited calls. Not sure if this would introduce new timing issues, and given that we are on .NET Framework, use appears to come with risks
  2. Using MqttClient, as opposed to ManagedMqttClient. This thread makes it sound like MqttClient is preferred. Should I be using it instead? Is it reasonable to use one MqttClient for the life of the app (using DisconnectAsync()/ConnectAsync() when broker settings change)? (This still doesn't address checks like _mqttClient.IsConnected)
  3. Surround every call to the client object with a try/catch for ObjectDisposed exceptions, and replace the client like this:
var oldClient = _mqttClient
_mqttClient = _factory.CreateManagedMqttClient();
oldClient?.Dispose();

Again, this doesn't address checks like _mqttClient.IsConnected.

Just wondering if anyone can offer insight as to the commonly accepted way of doing this.


Solution

  • I also work with real-time communications (OPC DA) and would like to provide some recommendations based on my experience.

    The approach using a single client looks the most resonable in case of multi-threaded application. Just you need creating a full-function wrapper specially for your application to prevent access to "dangerous" methods and mainly to provide only necessary functionality.

    You client should be run in the separate thread and provide methods to start, shutdown and restart to be used in the application main cycle.

    Also the client should incorporate a watchdog for the self-control.

    UPD

    How to "solve issues like the potential for interruption between calling IsConnected and StartAsync()"? Use wrapper and prevent direct call (of IsConnected and StartAsync()) the MqttClient. Wrapper provides a single entry point for each dangerous method and can internally manage all calls. Use a call queue for it.

    The watchdog typically is the timer which monitors operation, determines the status (like IsConnected), and holds/restarts the client if necessary.