I use MassTransit in a .NET core web application (web api) to use SQS. It was working fine for publishing messages. But after I tried to add a consumer, I ran into an issue.
Here is my code
public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
{
services.AddMassTransit(x =>
{
x.AddConsumer<CustomerChangeConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(massTransitConfiguration.Host, h =>
{
h.AccessKey(massTransitConfiguration.AccessKey);
h.SecretKey(massTransitConfiguration.SecretKey);
cfg.ReceiveEndpoint("CustomerChangeConsumer",
configurator =>
{
configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
});
// scope topics as well
h.EnableScopedTopics();
});
});
});
services.AddMassTransitHostedService();
}
The problems is that ReceiveEndpoint
gives a NullReferenceException
.
This is strange as ConfigureConsumer
is executed.However, ReceiveEndpoint
just cannot fully finish without throwing this exception.
Here is the stacktrace:
MassTransit.AmazonSqsTransport.Configurators.ConfigurationHostSettings.FormatHostAddress()
System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
System.Lazy`1.CreateValue()
System.Lazy`1.Value
MassTransit.AmazonSqsTransport.Configurators.ConfigurationHostSettings.HostAddress
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsHostConfiguration.HostAddress
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsReceiveEndpointConfiguration.FormatInputAddress()
System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
System.Lazy`1.CreateValue()
System.Lazy`1.Value
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsReceiveEndpointConfiguration.InputAddress
MassTransit.Monitoring.Health.EndpointHealth.EndpointConfigured(T configurator)
MassTransit.Monitoring.Health.BusHealth.MassTransit.EndpointConfigurators.IEndpointConfigurationObserver.EndpointConfigured[T](T configurator)
MassTransit.EndpointConfigurators.EndpointConfigurationObservable.<>c__DisplayClass0_0`1.<EndpointConfigured>b__0(IEndpointConfigurationObserver observer)
GreenPipes.Util.Connectable`1.All(Func`2 callback)
MassTransit.EndpointConfigurators.EndpointConfigurationObservable.EndpointConfigured(T configurator)
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsHostConfiguration.CreateReceiveEndpointConfiguration(QueueReceiveSettings settings, IAmazonSqsEndpointConfiguration endpointConfiguration, Action`1 configure)
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsHostConfiguration.CreateReceiveEndpointConfiguration(String queueName, Action`1 configure)
MassTransit.AmazonSqsTransport.Configuration.AmazonSqsHostConfiguration.ReceiveEndpoint(String queueName, Action`1 configureEndpoint)
MassTransit.AmazonSqsTransport.Configurators.AmazonSqsBusFactoryConfigurator.ReceiveEndpoint(String queueName, Action`1 configureEndpoint)
Kinley.SMPD.CustomerService.API.Extensions.MassTransitExtensions.<>c__DisplayClass0_1.<UseMassTransit>b__2(IAmazonSqsHostConfigurator h) in MassTransitExtensions.cs: line: 23
How come? And how to resolve?
You shouldn’t configure receive endpoints inside the host configuration closure. Try moving it outside the .Host()
method and see if that resolves your issue.