I am in the process of converting our project from an inhouse receiver/dispatcher implementation to Mass Transit.
Things were initially moved forward well, but I have run into an issue that I am struggling to determine the cause of.
Note: as we are migrating from an old implementation to a new implementation, the naming of some of our internal components might look strange.
Consumers
We are implementing our consumers inside classes, grouped by purpose.
I have this test grouping:
public class TestCommandHandler :
IConsumer<PingCommand>,
IConsumer<TestCommand>
{
private readonly ILoggingProvider _logger;
public TestCommandHandler(
ILoggingProvider loggingProvider
{
_logger = loggingProvider;
}
public Task Consume(ConsumeContext<TestCommand> cmd)
{
_logger.Info(cmd.Message.Message);
if (cmd.SentTime.HasValue)
{
_logger.Info($"Sent to {cmd.SessionId()} at {cmd.SentTime} - duration to now {DateTime.UtcNow.Subtract(cmd.SentTime.Value)}");
}
return Task.CompletedTask;
}
public async Task Consume(ConsumeContext<PingCommand> cmd)
{
_logger.Info($"Received Ping Command for {cmd.Message.OperationId}");
_logger.Info($"Checking compression...");
// do some checks and throw if an issue
}
}
I also have this "polling" result grouping:
public class PollCommandHandler :
IConsumer<PollResult>
{
private readonly IAggregateRepository<PollResult> _aggregateRepository;
public PollCommandHandler(IAggregateRepository<PollResult> aggregateRepository)
{
_aggregateRepository = aggregateRepository;
}
public async Task Consume(ConsumeContext<Model.Commands.PollResult> cmd)
{
var poll = _aggregateRepository.Find(cmd.Message.Id);
poll.AddPollResult(cmd.Message.RetailerId.ToString(), cmd.Message.PollDate, cmd.Message.PollResponseStatus, cmd.Message.BlobUri);
await _aggregateRepository.SaveAsync(poll, cmd.Message.Id.ToString());
}
Bus setup
During start-up of our host service, we're using Autofac
to register all of our components (consumerAssembly
is the Assembly
that has the above Consumers / Handlers):
builder.AddMassTransit(mt =>
{
mt.AddConsumers(consumerAssembly);
mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = _config.Infrastructure.AzureServiceBusSharedSecretIssuer;
s.SharedAccessKey = _config.Infrastructure.AzureServiceBusSharedSecretValue;
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
});
x.ReceiveEndpoint(host, $"mt.{QueueName.Command.ToString()}", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;
ep.ConfigureConsumers(context);
});
}));
});
The issue I am seeing is that all of the commands that are handled by TestCommandHandler
are received and processed. This is good.
But, when I send a PollResult
command, which is handled by the PollCommandHandler
, the messages are not consumed - rather, they are moved to the skipped
queue, and I'm not sure why. This is bad.
As I understand it, mt.AddConsumers(consumerAssembly);
should automatically scan and register the consumers we have defined, and ep.ConfigureConsumers(context);
will automatically wire them up, so that the ReceiveEndpoint
knows where to dispatch the message to.
I have run a GetProbeResult()
on the bus post configuration, and as far as I can tell, all of the Consumers are configured properly.
Looking for either a pointer to something that I've done wrong in setting this up, or some guidance on how I can diagnose the issue.
Well, this is embarrassing.
It looked like we have a PollCommandHandler
and an FtpPollCommandHandler
, with a corresponding PollResult
and FtpPollResult
commands.
I had migrated PollCommandHandler
to be a Consumer, but was testing with the FtpPollResult
command.
Thus, Mass Transit was moving the message to the skipped queue, because there truly was no consumer associated with it.