Search code examples
c#mqtt

Using MQTTNet, how to subscribe to topic, parse then push back in a console application all together


Using MQTTNet, I am not very familiar with async in C# so I need a little bit help.

I want to subscribe to a MQTT topic (external broker like mosquitto), parse messages, and for particular ones push something back. Pretty simple and already working in my pieces :)

This is my subscribe

public static async Task Subscribe_Topic() 
{
    /*
     * This sample subscribes to a topic.
     */

    var mqttFactory = new MqttFactory();

    var mqttClient = mqttFactory.CreateMqttClient();

    mqttClient.ApplicationMessageReceivedAsync += e =>
        {
            await ConnectAndSendMessage(); // NOT working

            Console.WriteLine("Received application message.");
            Console.WriteLine(e.ClientId);
            return Task.CompletedTask;
        };

    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("my_topic");
            })
        .Build();

    var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

    Console.WriteLine("MQTT client subscribed to topic ");

    // The response contains additional data sent by the server after subscribing.
    //response.DumpToConsole();
    //}
} 

Which works, using this ugly while() I came up with :)

static async Task Main(string[] args)
{

    await Subscribe_Topic();


    while (true) ;


}

This is what I use for connect

   public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("10.77.150.243", 8883)
        .WithTimeout(new TimeSpan(600000000))
.WithCredentials("mydevice1", "mypass")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("MY_ID")
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
    AllowUntrustedCertificates = true,
    SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
    IgnoreCertificateChainErrors = true,
    UseTls = true,
})
.Build();

This my PUB function

    public static async Task ConnectAndSendMessage()
    {
        var mqttFactory = new MqttFactory();

        using (var mqttClient = mqttFactory.CreateMqttClient())
        {

            await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);


            string tmp = @"xxntrary to popular belief, Lorem Ipsum ...";

            var applicationMessage = new MqttApplicationMessageBuilder()
                .WithTopic("my_other_topic")
                .WithRetainFlag(false)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
                .WithPayload(tmp)
                .Build();

            await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

            await mqttClient.DisconnectAsync();

            Console.WriteLine("MQTT application message is published.");
        }

    }

Now how can I send if I receive certain message?

What I can do is to implement a background worker instead of that while() but I think I already have async stuff here, so may be not needed.

Just don't know how to connect them properly.

Thanks for hints!


Solution

  • In order for the line of code you provided, await ConnectAndSendMessage();, to work correctly, it needs to be within an async function. To address this, I recommend modifying the code as shown below:

    mqttClient.ApplicationMessageReceivedAsync += async e =>
    {
        await ConnectAndSendMessage(); // only work for async function
        Console.WriteLine("Received application message.");
        Console.WriteLine(e.ClientId);
        return; // You can either use 'return;' or simply omit the return statement.
    };
    

    You can make the anonymous function asynchronous by adding the async keyword before the argument e. Regarding the return statement, you have the option to use return; or omit it entirely; both approaches are acceptable.