Search code examples
c#.net-coreazureservicebusmasstransit

Calling Publish() on MassTransit IPublishEndpoint with Azure Service Bus doesn't return


I experience issues publishing messages to Azure Service Bus using Mass Transit. Calling await Publish() on either IBus or IPublishEndpoint doesnt return. When i switching provider to InMemory it returns as expected.

To narrow down the issue, i used the MassTransit AzureServiceBus sample application https://github.com/MassTransit/Sample-AzureServiceBus. Only setting the connection string in appsettings.json and leaving the default Startup configuration pointing to ASB

services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.UsingAzureServiceBus((_, cfg) =>
    {
        cfg.Host(Configuration.GetConnectionString("AzureServiceBus"));

        cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
    });
});

When hitting the Put() endpoint in ShipmentController the await _publishEndpoint.Publish() call doesn't return. return Ok() is never reached.

namespace Sample.Api.Controllers
{
    using System;
    using System.Threading.Tasks;
    using Contracts;
    using MassTransit;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Logging;


    [ApiController]
    [Route("[controller]")]
    public class ShipmentController :
        ControllerBase
    {
        readonly ILogger<ShipmentController> _logger;
        readonly IPublishEndpoint _publishEndpoint;

        public ShipmentController(ILogger<ShipmentController> logger, IPublishEndpoint publishEndpoint)
        {
            _logger = logger;
            _publishEndpoint = publishEndpoint;
        }

        [HttpPut]
        public async Task<IActionResult> Put(Guid orderId)
        {
            await _publishEndpoint.Publish(new OrderShipped
            {
                OrderId = orderId,
                Timestamp = DateTimeOffset.Now
            });

            return Ok(new {OrderId = orderId});
        }
    }
}

When switching to InMemory the await _publishEndpoint.Publish() in ShipmentController returns as expected.

services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
    });
});

Solution

  • I saw the requests on port 5671 being refused in Wireshark. Switching TransportType to AmqpWebSockets (port 443) as suggested here https://stackoverflow.com/a/60333992/1573728 did the trick, so definitely Firewall issue.

    services.AddMassTransit(x =>
    {
        x.SetKebabCaseEndpointNameFormatter();
    
        x.UsingAzureServiceBus((_, cfg) =>
        {
            cfg.Host(Configuration.GetConnectionString("AzureServiceBus"), cfg => cfg.TransportType = ServiceBusTransportType.AmqpWebSockets);
    
            cfg.Send<OrderShipped>(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D")));
        });
    }
    

    In my opinion MassTransit should have thrown an Exception at some point since it wasn't able to publish, or simply because of a timeout.