I'm building a .NET service that connects to an EMQX broker using MQTTnet. While the client connects to EMQX, when I test sending messages through MQTTX (MQTT client tool), my service never receives them. The subscription doesn't seem to be getting subscribed, despite using the SubscribeAsync
Program.cs (Service Setup)
var builder = Host.CreateApplicationBuilder(args);
// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();
// Other configuration...
var host = builder.Build();
await host.RunAsync();
MQTTClient.cs
public class MQTTClient
{
private readonly ILogger<MQTTClient> _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();
if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();
await _mqttClient.StartAsync(mqttOptions);
}
public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
ServiceWorker.cs
public class ServiceWorker : BackgroundService
{
private readonly ILogger<ServiceWorker> _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await _mqttClient.StopAsync(stoppingToken);
}
}
Using what @brits provided I was able to get some logs output. Here's what those look like.
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
HandleMessageReceived
method is never triggered when messages are sent via MQTTXWhy isn't my service receiving messages that I publish through MQTTX? The client connects successfully, but messages sent through MQTTX never trigger the HandleMessageReceived
method. What could be preventing the message reception?
In your ACL file, which you can access via https://<broker url>/#/authorization/detail/file
make sure your IP isn't blacklisted and/or you're using an unaccepted string for your Client ID.