Search code examples
c#asp.net-coremqttmqttnet

How to keep BackgroundService task receiving mqtt messages without Console.ReadLine()


In my ASP.Net Core 6 application, a BackgroundService task called MqttClientService runs a MQTTNet client that handles incoming mqqt messages and responds with a message to indicate it was successful.

I have gotten the sample console app from the MQTTNet repo to work using Console.ReadLine(), however this feels like a hack for my use case. Is there a better way to keep the BackgroundService handling incoming messages without restarting constantly?

There is an example with Asp.Net Core and MQTTNet version 3, but it uses handles implemented by interfaces rather than async events that the library now uses: the MQTTNet's Upgrading Guide.

Any information will be appreciated, thank you.

MqttClientService.cs in Services/

using MQTTnet;
using MQTTnet.Client;
using System.Text;

namespace MqttClientAspNetCore.Services
{
    public class MqttClientService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Handle_Received_Application_Message();
            }
        }

        public static async Task Handle_Received_Application_Message()
        {

            var mqttFactory = new MqttFactory();

            using (var mqttClient = mqttFactory.CreateMqttClient())
            {
                var mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer("test.mosquitto.org")
                    .Build();

                // Setup message handling before connecting so that queued messages
                // are also handled properly. 
                mqttClient.ApplicationMessageReceivedAsync += e =>
                {
                    Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

                    // Publish successful message in response
                    var applicationMessage = new MqttApplicationMessageBuilder()
                        .WithTopic("keipalatest/1/resp")
                        .WithPayload("OK")
                        .Build();

                    mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

                    Console.WriteLine("MQTT application message is published.");

                    return Task.CompletedTask;
                };

                await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

                var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic("keipalatest/1/post");
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();

                await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

                Console.WriteLine("MQTT client subscribed to topic.");
                // The line below feels like a hack to keep background service from restarting
                Console.ReadLine();
            }
        }
    }
}

Program.cs

using MqttClientAspNetCore.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddHostedService<MqttClientService>();

var app = builder.Build();

// To check if web server is still responsive
app.MapGet("/", () =>
{
    return "Hello World";
});


app.Run();

Solution

  • There's no need for Console.ReadLine or even the loop. The BackgroundService application code won't terminate when ExecuteAsync returns. If you want the application to terminate when ExecuteAsync terminates you have to actually tell it to through the IApplicationLifecycle interface.

    I've found this the hard way the first time I tried using a Generic host for a command line tool. Which seemed to hang forever ....

    ExecuteAsync can be used to set up the MQTT client and the event handler and just let them work. The code terminates only when StopAsync is called. Even then, this is done by signaling a cancellation token, not by aborting some worker thread.

    The client itself can be built in the constructor, eg using configuration settings. Only ConnectAsync needs to be called in ExecuteAsync.

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
    
        await _client.ConnectAsync(_clientOptions, CancellationToken.None);
        _logger.LogInformation("Connected");
    
        await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
        _logger.LogInformation("Subscribed");
    }
    

    The service code stops when StopAsync is called and the cancellation token is triggered. stoppingToken.Register could be used to call _client.DisconnectAsync when that happens, but Register doesn't accept an asynchronous delegate. A better option is to override StopAsync itself :

    public virtual async Task StopAsync(CancellationToken cancellationToken)
    {
        await _client.DisconnectAsync();
        await base.StopAsync(cancellationToken);
    }
    

    The constructor can create the client and register the message handler

    public class MqttClientService : BackgroundService
    {
        ILogger<MqttClientService> _logger;
        IMqttClient _client=client;
    
        MqttClientOptions _clientOptions;
        MqttSubscriptionOptions _subscriptionOptions;    
        string _topic;
    
        public MqttClientService(IOptions<MyMqttOptions> options, 
                                ILogger<MqttClientService> logger)
        {
            _logger=logger;
            _topic=options.Value.Topic;
            var factory = new MqttFactory();
            _client = factory.CreateMqttClient();
            _clientOptions = new MqttClientOptionsBuilder()
                            .WithTcpServer(options.Value.Address)
                            .Build();
            _subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
                        .WithTopicFilter(f =>
                        {
                            f.WithTopic(options.Value.Topic);
                            f.WithAtLeastOnceQoS();
                        })
                        .Build();
            _client.ApplicationMessageReceivedAsync += HandleMessageAsync;
        }
    

    Received messages are handled by the HandleMessageAsync method :

    async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
    {
        var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        _logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
        var applicationMessage = new MqttApplicationMessageBuilder()
                        .WithTopic(_topic)
                        .WithPayload("OK")
                        .Build();
    
        await _client.PublishAsync(applicationMessage, CancellationToken.None);
    
        _logger.LogInformation("MQTT application message is published.");
    }
    

    Finally, since BackgroundService implements IDisposable, we can use Dispose to dispose the _client instance :

    public void Dispose()
    {
        Dispose(true);
    }
    
    protected virtual Dispose(bool disposing)
    {
        if(disposing)
        {
            _client.Dispose();
            base.Dispose();
        }
        _client=null;
    }