Search code examples
c#integration-testingmasstransit

How to start the MassTransit bus when integration testing using Testcontainers instead of the in-memory harness?


I've got an API which uses MassTransit and RabbitMQ (docker container) as the messaging protocol/layer and I'm trying to write some integration tests (WebApplicationFactory) using Testcontainers, however, the MassTransit Bus is never started, so the messages are never consumed. The plan is not to use the MassTransit in-memory harness as we want to mimic a real environment. Has anyone had this problem? Is there a way to manually start the Bus?

When IRabbitMqBusFactoryConfigurator.ConfigureEndpoints is called in the integration test the endpoints/queues/exchanges are configured/mapped but never declared/bound and subsequentially the Bus is never started (which works fine when I spin up the API itself, binding all queues and consumers in one go). The only time, while running the test, a queue/exchange is declared/bound, is when ISendEndpointProvider.Send is called, but then the consumer is never hit/executed even though the message exists in the queue.

The log when ISendEndpointProvider.Send is called while the integration test is running (consumer never called though):

enter image description here

MassTransit configuration:

private static void ConfigureMassTransitWithRabbitMq<TMarker>(this IServiceCollection services, IConfiguration configuration)
    where TMarker : IMarker
{
    services.Configure<QueueSettings>(configuration.GetSection(QueueSettings.SettingName));

    // Change endpoint name formatter globally - will prefix service name
    services.AddSingleton<IEndpointNameFormatter>(
        static serviceProvider =>
        {
            var serviceSettings = serviceProvider.GetRequiredService<IOptions<ServiceSettings>>();

            return new KebabCaseEndpointNameFormatter(serviceSettings.Value.ServiceName, false);
        });

    services.AddMassTransit(
        static configure =>
        {
            configure.AddConsumers(Assembly.GetExecutingAssembly(), typeof(TMarker).Assembly);

            configure.UsingRabbitMq(
                static (context, configurator) =>
                {
                    var serviceSettings = context.GetRequiredService<IOptions<ServiceSettings>>();

                    using var scope = context.CreateScope();

                    var queueSettings = scope.ServiceProvider.GetRequiredService<IOptionsSnapshot<QueueSettings>>();

                    configurator.Host(queueSettings.Value.Hostname);

                    configurator.UseInstrumentation(serviceName: serviceSettings.Value.ServiceName);

                    configurator.UseMessageRetry(
                        retryConfigurator =>
                        {
                            retryConfigurator.Interval(queueSettings.Value.Retries ?? throw new ArgumentNullException(nameof(queueSettings.Value.Retries)),
                                                       queueSettings.Value.RetriesTimeSeconds ?? throw new ArgumentNullException(nameof(queueSettings.Value.RetriesTimeSeconds)));
                        });

                    configurator.ClearSerialization();
                    configurator.UseRawJsonSerializer();
                    configurator.ConfigureJsonSerializerOptions(
                        static options =>
                        {
                            var serializerOptions = Application.UI.Dependencies.JsonSerializerOptions;

                            options.IncludeFields = serializerOptions.IncludeFields;
                            options.PropertyNameCaseInsensitive = serializerOptions.PropertyNameCaseInsensitive;

                            foreach (var converter in serializerOptions.Converters)
                            {
                                options.Converters.Add(converter);
                            }

                            options.TypeInfoResolver = serializerOptions.TypeInfoResolver;

                            return options;
                        });

                    configurator.ConfigureEndpoints(context, context.GetRequiredService<IEndpointNameFormatter>());
                });
        });
}

typeof(TMarker).Assembly in AddConsumers points to the SUT so consumers were correctly mapped.

Expectation:

(1) Configure endpoint/consumer -> (2) Declare Queues/Exchanges - (3) Bind Queues/Exchanges -> (4) Consumers OK -> (5) Endpoints Ready -> (6) Bus Started.

Result:

Only (1) and (2) happens, (2) only after Send called and for the specific endpoint only.

Hope that makes sense.


Solution

  • If I were to guess, the bus isn't being started by the test environment. Your test set up, if it's custom, should be sure to start and stop all hosted services on startup and shutdown to ensure things are cleaned up.

    Leaving a running bus after test execution is going to create all sort of unreliable tests since you'd have multiple buses consuming from endpoints.

    For instance, the test harness Start method call:

    var hostedServices = _provider.GetServices<IHostedService>().ToArray();
    
    foreach (var service in hostedServices)
        await service.StartAsync(CancellationToken).ConfigureAwait(false);
    

    And does the opposite on shutdown (in .Reverse() order, of course).