Search code examples
c#autofacmasstransit

Mass Transit - only single Consumer "handler" is processing messages


I am in the process of converting our project from an inhouse receiver/dispatcher implementation to Mass Transit.

Things were initially moved forward well, but I have run into an issue that I am struggling to determine the cause of.

Note: as we are migrating from an old implementation to a new implementation, the naming of some of our internal components might look strange.

Consumers

We are implementing our consumers inside classes, grouped by purpose.

I have this test grouping:

public class TestCommandHandler : 
    IConsumer<PingCommand>,
    IConsumer<TestCommand>
{
    private readonly ILoggingProvider _logger;

    public TestCommandHandler(
        ILoggingProvider loggingProvider
    {
        _logger = loggingProvider;
    }

    public Task Consume(ConsumeContext<TestCommand> cmd)
    {
        _logger.Info(cmd.Message.Message);
        if (cmd.SentTime.HasValue)
        {
            _logger.Info($"Sent to {cmd.SessionId()} at {cmd.SentTime} - duration to now {DateTime.UtcNow.Subtract(cmd.SentTime.Value)}");
        }
        return Task.CompletedTask;
    }

    public async Task Consume(ConsumeContext<PingCommand> cmd)
    {
        _logger.Info($"Received Ping Command for {cmd.Message.OperationId}");
        _logger.Info($"Checking compression...");

        // do some checks and throw if an issue
    }
}

I also have this "polling" result grouping:

public class PollCommandHandler :
    IConsumer<PollResult>
{
    private readonly IAggregateRepository<PollResult> _aggregateRepository;

    public PollCommandHandler(IAggregateRepository<PollResult> aggregateRepository)
    {
        _aggregateRepository = aggregateRepository;
    }

    public async Task Consume(ConsumeContext<Model.Commands.PollResult> cmd)
    {
        var poll = _aggregateRepository.Find(cmd.Message.Id);

        poll.AddPollResult(cmd.Message.RetailerId.ToString(), cmd.Message.PollDate, cmd.Message.PollResponseStatus, cmd.Message.BlobUri);

        await _aggregateRepository.SaveAsync(poll, cmd.Message.Id.ToString());
    }

Bus setup

During start-up of our host service, we're using Autofac to register all of our components (consumerAssembly is the Assembly that has the above Consumers / Handlers):

builder.AddMassTransit(mt =>
{
    mt.AddConsumers(consumerAssembly);
    mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
    {
        x.RequiresSession = true;
        x.MaxConcurrentCalls = 500;
        x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
        x.UseRenewLock(TimeSpan.FromMinutes(4));

        var host = x.Host(serviceUri, h =>
        {
            h.SharedAccessSignature(s =>
            {
                s.KeyName = _config.Infrastructure.AzureServiceBusSharedSecretIssuer;
                s.SharedAccessKey = _config.Infrastructure.AzureServiceBusSharedSecretValue;
                s.TokenTimeToLive = TimeSpan.FromDays(1);
                s.TokenScope = TokenScope.Namespace;
            });
        });

        x.ReceiveEndpoint(host, $"mt.{QueueName.Command.ToString()}", ep =>
        {
            ep.RequiresSession = true;
            ep.MaxConcurrentCalls = 500;
            ep.RemoveSubscriptions = true;

            ep.ConfigureConsumers(context);        
        });
    }));
});

The issue I am seeing is that all of the commands that are handled by TestCommandHandler are received and processed. This is good.

But, when I send a PollResult command, which is handled by the PollCommandHandler, the messages are not consumed - rather, they are moved to the skipped queue, and I'm not sure why. This is bad.

As I understand it, mt.AddConsumers(consumerAssembly); should automatically scan and register the consumers we have defined, and ep.ConfigureConsumers(context); will automatically wire them up, so that the ReceiveEndpoint knows where to dispatch the message to.

I have run a GetProbeResult() on the bus post configuration, and as far as I can tell, all of the Consumers are configured properly.

Looking for either a pointer to something that I've done wrong in setting this up, or some guidance on how I can diagnose the issue.


Solution

  • Well, this is embarrassing.

    It looked like we have a PollCommandHandler and an FtpPollCommandHandler, with a corresponding PollResult and FtpPollResult commands.

    I had migrated PollCommandHandler to be a Consumer, but was testing with the FtpPollResult command.

    Thus, Mass Transit was moving the message to the skipped queue, because there truly was no consumer associated with it.