Search code examples
publish-subscribenservicebus

NServiceBus Invalid Subscriptions


I am setting up a project utilizing NServiceBus and SQL Server, with a publish/subscribe model and running into an issue that makes no sense. I have two messages - OpportunityMessage and PageVisitMessage, both of which are standard POCO classes that are flagged with IEvent and look like this:

public class OpportunityMessage : IEvent
{
    /// <summary>
    /// Gets or sets the name of the form
    /// </summary>
    public string FormName { get; set; }

    /// <summary>
    /// Gets or sets the page identifier
    /// </summary>
    public byte[] PageIdentifier { get; set; }

    /// <summary>
    /// Gets or sets any supplemental data with a form
    /// </summary>
    public string SupplementalData { get; set; }
}

public class PageVisitMessage : IEvent
{
    /// <summary>
    /// Gets or sets the IP address of the requestor
    /// </summary>
    public string IpAddress { get; set; }

    /// <summary>
    /// Gets or sets the URL that generated a page visit
    /// </summary>
    public string NavigateUrl { get; set; }

    /// <summary>
    /// Gets or sets the referrer
    /// </summary>
    public string Referrer { get; set; }

    /// <summary>
    /// Gets or sets the user agent
    /// </summary>
    public string UserAgent { get; set; }
}

From a publishing perspective, everything works as expected - messages get published based on what's stored in the Subscriptions table I've configured; however, that's where I get confused. Each message type is destined for a different queue, messaging_opportunity_in and messaging_visitors_in, but whenever either message is fired, it shows up in both queues.

I did some digging and looked at the configured Subscriptions table and each message is configured to go to both queues, so it showing up in both makes sense; however, it's not setup that way from a code perspective. Each message type has a dedicated "handler" - one for PageVisitMessage and one for OpportunityMessage. The class structure for those looks like this:

public class OpportunityMessageHandler : BaseMessageHandler, IHandleMessages<OpportunityMessage>
{   
    //configured in appsettings.json as "messaging_opportunity_in"
    public override string QueueName => this.Configuration["Messaging:OpportunityPublishQueue"];

    public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
    {
        //code to process the message
    }
}

public class VisitorMessageHandler : BaseMessageHandler, IHandleMessages<PageVisitMessage>
{   
    //configured in appsettings.json as "messaging_visitors_in"
    public override string QueueName => this.Configuration["Messaging:VisitorPublishQueue"];

    public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
    {
        //code to process the message
    }
}

Each of those inherits from BaseMessageHandler which is where the endpoint is configured to listen to the configured queue name via an Initialize method that looks like this:

    /// <summary>
    /// Initializes this message handler.
    /// </summary>
    /// <param name="configuration">The <see cref="IConfiguration"/> instance containing the application's configuration.</param>
    public async Task Initialize(IConfiguration configuration)
    {
        this.Configuration = configuration;

        //setup our base listener
        var listener = new EndpointConfiguration(this.QueueName);
        listener.EnableInstallers();
        listener.SendFailedMessagesTo($"{this.QueueName}_errors");

        //configure the SQL Server transport
        var transport = new SqlServerTransport(Configuration.GetConnectionString("CS"))
        {
            DefaultSchema = this.DefaultSchemaName
        };

        transport.SchemaAndCatalog.UseSchemaForQueue($"{this.QueueName}_errors", this.DefaultSchemaName);
        listener.UseTransport(transport);

        //configure the subscription listener
        transport.Subscriptions.DisableCaching = true;
        transport.Subscriptions.SubscriptionTableName = new NServiceBus.Transport.SqlServer.SubscriptionTableName(this.Configuration["Messaging:Subscriptions"], schema: this.DefaultSchemaName);

        //spin it up
        this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
    }

What I'm trying to track down is why do these two handlers get setup to listen to messages they aren't configured for? When I run the backend message processor that spins these up, the configured Subscriptions table adds both endpoints (messaging_opportunities_in and messaging_visitors_in) as listening to all messages that exist, regardless of the fact that their handlers are configured to listen to a specific queue and not all. This essentially means every message is duplicated between the two queues and since the handlers are running, they end up processing everything twice even though each should be responsible solely for its' own message type.

Any help would be greatly appreciated!


Solution

  • Nowhere on the NServiceBus documentation website is something similar to your BaseMessageHandler class on message handlers used. But this is where the problem starts.

    You're not providing all code, so you refer to a BaseMessageHandler but only show the Initialize() method. I'm assuming that this method is inside the BaseMessageHandler class. But the code you shared doesn't show how and when that method is called.

    Let's dive into this line:

    var listener = new EndpointConfiguration(this.QueueName);
    

    First of all, the object listener should be called configuration or endpointConfiguration, because it'll never listen to anything. Also, the parameter in the constructor is endpointName and not expecting a queue name. For example, in SQL Server your actual queue name would be something like opportunity@dbo@nservicebus.

    Since you mention messaging_opportunity_in and messaging_visitors_in, without having any additional context, I would name the endpoints opportunities and visitors.

    Now let's look at the following line:

    this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
    

    The Endpoint.Start() method returns the actual endpoint instance. The thing that is trying to process messages. But before it's started, it will do some scanning of all the types that inherit the IHandleMessages<T> interface. If Type<T> is an event (in your case, classes marked with IEvent) and it will create a subscription for that event.

    Now, I don't know how the Initialize() method on those base classes is executed, because you did not include that code. But you execute it twice. Once for opportunity and once for visitors. This means the opportunity endpoint starts scanning for classes that implement IMessageHandler<T> and finds two.

    1. OpportunityMessageHandler
    2. VisitorMessageHandler

    It sees two messages, both are events.

    1. OpportunityMessage
    2. PageVisitMessage

    The endpoint opportunity endpoint stores the fact that it wants to subscribe to both messages. Resulting in the subscription you're seeing.

    The next thing that happens is that the visitors endpoint is started and does EXACTLY THE SAME THING. It starts scanning, finds 2 message handlers and 2 events and creates another subscription.

    How to solve this

    This assumes you want 2 components where you send messages between

    1. Lose the base class from the message handlers
    2. Create a new Visual Studio project
      1. Add the initialization code for one of the endpoints; either visitors or opportunities
      2. Remove the correct message handler from the first project and move it to this new project
    3. Run the Initialize method like you do now, but only once in each project
    4. Start sending messages from visitors to opportunities and vice versa.

    I recommend following the tutorial on the NServiceBus website and contact Particular Software, makers of NServiceBus. If you mention my name, we can set up a call and run through this together and you can even request free help with a proof of concept if you'd like that.