Search code examples
.net-corenservicebus

NServiceBus Router events published on Amazon SQS transport are not handled by an Azure Service Bus transport endpoint


I've been trying to get NServiceBus.Router working to allow endpoints using the AmazonSQS transport and the AzureServiceBus transport to communicate with each other. So far, I am able to get a command sent from the ASB endpoint through the router and handled by the SQS endpoint. However, when I publish an event from the SQS endpoint, it is not handled by the ASB endpoint even though I have registered the SQS endpoint as a publisher. I have no idea what I'm doing wrong, but looking at every example I can find from from the docs, it seems like it should work.

I have already tried adding another forwarding route in the reverse of what is below (SQS to ASB), but that did not solve the issue.

The endpoints and router are each running in .net 5 worker services.

I've made a sample project that reproduces the issue here, but here are some quick at-a-glance snippets that show the relevant setup:

Router Setup

var routerConfig = new RouterConfiguration("ASBToSQS.Router");

var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB", t =>
{
    t.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

    t.Transactions(TransportTransactionMode.ReceiveOnly);
    t.SubscriptionRuleNamingConvention((entityType) =>
    {
        var entityPathOrName = entityType.Name;
        if (entityPathOrName.Length >= 50)
        {
            return entityPathOrName.Split('.').Last();
        }

        return entityPathOrName;
    });
});

var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
    t.UnrestrictedDurationDelayedDelivery();

    t.Transactions(TransportTransactionMode.ReceiveOnly);

    var settings = t.GetSettings();

    // Avoids a missing setting error
    //https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
    bool isMessageType(Type t) => true;
    var ctor = typeof(MessageMetadataRegistry).GetConstructor(
        BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null,
        new[] {typeof(Func<Type, bool>)}, null);
#pragma warning disable CS0618 // Type or member is obsolete
    settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type, bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete

});

var staticRouting = routerConfig.UseStaticRoutingProtocol();

staticRouting.AddForwardRoute("ASB", "SQS");

routerConfig.AutoCreateQueues();

ASB Endpoint Setup

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");

var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();

transport.SubscriptionRuleNamingConvention((entityType) =>
{
    var entityPathOrName = entityType.Name;
    if (entityPathOrName.Length >= 50)
    {
        return entityPathOrName.Split('.').Last();
    }

    return entityPathOrName;
});

transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");

bridge.RouteToEndpoint(typeof(ASBToSQSCommand), "ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent), "ASBToSQSRouter.SQSEndpoint");

endpointConfiguration.EnableInstallers();

SQS Endpoint Setup (nothing special because it doesn't need to know about the router)

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");

var transport = endpointConfiguration.UseTransport<SqsTransport>();

transport.UnrestrictedDurationDelayedDelivery();

transport.Transactions(TransportTransactionMode.ReceiveOnly);

endpointConfiguration.EnableInstallers();

Any help would be greatly appreciated!


Solution

  • Unfortunately one of the recent SQS transport releases contains a change that makes the subscription work only by default in the context of a full NServiceBus endpoint. This feature is subscription batching.

    In order for the Router to work correctly (Router does not run a full endpoint, just NServiceBus transport), you need to add this magic line to the SQS interface configuration:

    settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart", true);
    

    This is an undocumented flag that disables the subscription batching and allows router to complete the subscribe operations normally.

    I am sorry for the inconvenience.