I experience issues publishing messages to Azure Service Bus using Mass Transit. Calling await Publish()
on either IBus
or IPublishEndpoint
doesnt return. When i switching provider to InMemory
it returns as expected.
To narrow down the issue, i used the MassTransit AzureServiceBus sample application https://github.com/MassTransit/Sample-AzureServiceBus. Only setting the connection string in appsettings.json and leaving the default Startup configuration pointing to ASB
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingAzureServiceBus((_, cfg) =>
{
cfg.Host(Configuration.GetConnectionString("AzureServiceBus"));
cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
});
});
When hitting the Put()
endpoint in ShipmentController
the await _publishEndpoint.Publish()
call doesn't return. return Ok()
is never reached.
namespace Sample.Api.Controllers
{
using System;
using System.Threading.Tasks;
using Contracts;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
[ApiController]
[Route("[controller]")]
public class ShipmentController :
ControllerBase
{
readonly ILogger<ShipmentController> _logger;
readonly IPublishEndpoint _publishEndpoint;
public ShipmentController(ILogger<ShipmentController> logger, IPublishEndpoint publishEndpoint)
{
_logger = logger;
_publishEndpoint = publishEndpoint;
}
[HttpPut]
public async Task<IActionResult> Put(Guid orderId)
{
await _publishEndpoint.Publish(new OrderShipped
{
OrderId = orderId,
Timestamp = DateTimeOffset.Now
});
return Ok(new {OrderId = orderId});
}
}
}
When switching to InMemory the await _publishEndpoint.Publish()
in ShipmentController
returns as expected.
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingInMemory((context, cfg) =>
{
cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
});
});
I saw the requests on port 5671 being refused in Wireshark. Switching TransportType
to AmqpWebSockets
(port 443) as suggested here https://stackoverflow.com/a/60333992/1573728 did the trick, so definitely Firewall issue.
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingAzureServiceBus((_, cfg) =>
{
cfg.Host(Configuration.GetConnectionString("AzureServiceBus"), cfg => cfg.TransportType = ServiceBusTransportType.AmqpWebSockets);
cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
});
}
In my opinion MassTransit should have thrown an Exception at some point since it wasn't able to publish, or simply because of a timeout.