Search code examples
c#mqttmqttnet

MQTTNet, get message and print from a server every 2 seconds


I'm using MQTTNet to get the status of a 3D printer every two seconds and print it in the console. The first message prints fine, but the second prints twice, the third three times, and so on. I want each message to print only once.. The method I use to print the status is as follows:

static async Task ObserverAsync(IMqttClient mqttClient, CancellationToken cancellationToken)
{
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {

            //Time span
            await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);

            // Callback function when a message is received
            mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                var msg = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

                if (/*significant condition*/)
                {
                    Console.WriteLine($"Received message: {msg}");
                }

                return Task.CompletedTask;
            };
        }
    }
    catch(TaskCanceledException)
    {
        Console.WriteLine("Observer Disabled");
    }
}

From what I've been researching, it may be a problem with how I handle the "ApplicationMessageReceivedAsync", since it seems that I am subscribing to the event multiple times. I'm not that familiar with the async and event handlers so I'm not sure. After doing a follow-up with the debugger I realized that internally MQTTNet fills a vector called "_handlersForInvoke" every time I call "ApplicationMessageReceivedAsync", maybe that could be a clue, but I really don't know how I can restrict the filling of the vector I don't think it's a good idea to do it "by force" either.


Solution

  • Your current implementation assigns a (new) handler inside a loop, which means that through every iteration, a new handler is assigned, linearly increasing the print messages as you had seen.

    Seeing your situation and intention, it might be better to choose a different approach, such as polling for messages instead of assigning a handler. E.g.

    static async Task ObserverAsync(IMqttClient mqttClient, CancellationToken cancellationToken)
    {
        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                //Time span
                await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
    
                // Await for a message
                var mqttPacket = await mqttClient.Receive(cancellationToken);
    
                var msg = Encoding.UTF8.GetString(mqttPacket.PayloadSegment);
    
                if (/*significant condition*/)
                {
                    Console.WriteLine($"Received message: {msg}");
                }
            }
        }
        catch(TaskCanceledException)
        {
            Console.WriteLine("Observer Disabled");
        }
    }