Search code examples
masstransit

Weird transactional outbox behaviour when running multiple WebApplications with MassTransit in same process


I have multiple WebApplications, each with its own separate DI container, running in same process (listening on different ports). Each application adds MassTransit (with RabbitMq) to DI with transactional outbox and also each application has its own DbContext (though all applications are using same physical PG database, just different DB schemas).

Here is how I register MT with outbox:

services.AddMassTransit(busConfigurator =>
{
    busConfigurator.AddConsumers(typeof(UpdateSubscriptionStatusOnSubscriptionStatusPublished).Assembly);

    busConfigurator.UsingRabbitMq((context, rabbitCfg) =>
    {
        rabbitCfg.ConfigureJsonSerializerOptions(jsonSerializerOptions =>
        {
            jsonSerializerOptions.ConfigureForNodaTime(new NodaJsonSettings());

            return jsonSerializerOptions;
        });

        rabbitCfg.Host(connectionUri);
        rabbitCfg.ConfigureEndpoints(context);
    });
    
    busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(o =>
    {
        o.UsePostgres();
        o.UseBusOutbox();
    });
});

Here is how I add outbox entities to db context

protected override void OnModelCreating(ModelBuilder builder)
{
    builder.HasDefaultSchema(InfrastructureConstants.Schema);
    
    builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
    
    builder.AddTransactionalOutboxEntities();
    
    base.OnModelCreating(builder);
}

And so far it was working fine, each application has it's own outbox entities in its db schema, each application has it's own MT outbox hosted service which constantly checks inbox/outbox like in logs below:

[09:17:30 INF SUBSCRIPTIONS] Executed DbCommand (40ms) [Parameters=[], CommandType='Text', CommandTimeout='30'] SELECT m.outbox_id, m.created, m.delivered, m.last_sequence_number, m.lock_id, m.row_version FROM ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) AS m LIMIT 2 (Command)

...

[09:17:30 INF IDENTITY] Executed DbCommand (40ms) [Parameters=[], CommandType='Text', CommandTimeout='30'] SELECT m.outbox_id, m.created, m.delivered, m.last_sequence_number, m.lock_id, m.row_version FROM ( SELECT * FROM "identity"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) AS m LIMIT 2 (Command)

...

[09:17:30 INF IDENTITY] Executed DbCommand (92ms) [Parameters=[@__removeTimestamp_0='?' (DbType = DateTime), @__p_1='?' (DbType = Int32)], CommandType='Text', CommandTimeout='30'] SELECT i.id, i.consumed, i.consumer_id, i.delivered, i.expiration_time, i.last_sequence_number, i.lock_id, i.message_id, i.receive_count, i.received, i.row_version FROM identity.inbox_state AS i WHERE i.delivered IS NOT NULL AND i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

...

[09:17:30 INF SUBSCRIPTIONS] Executed DbCommand (92ms) [Parameters=[@__removeTimestamp_0='?' (DbType = DateTime), @__p_1='?' (DbType = Int32)], CommandType='Text', CommandTimeout='30'] SELECT i.id, i.consumed, i.consumer_id, i.delivered, i.expiration_time, i.last_sequence_number, i.lock_id, i.message_id, i.receive_count, i.received, i.row_version FROM subscriptions.inbox_state AS i WHERE i.delivered IS NOT NULL AND i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

However something weird happens when I add following code to one of the applications (SUBSCRIPTIONS in this case):

app.Lifetime.ApplicationStarted.Register(async () =>
{
    using var scope = app.Services.CreateScope();
    var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
    var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
    await publishEndpoint.Publish(new RefreshPlans());
    await unitOfWork.SaveChangesAsync();
});

After adding this code, ALL applications start using "subscriptions"."outbox_state" but still using correct inbox_state:

[09:32:25 INF IDENTITY] Executed DbCommand (1ms) [Parameters=[], CommandType='Text', CommandTimeout='30'] SELECT m.outbox_id, m.created, m.delivered, m.last_sequence_number, m.lock_id, m.row_version FROM ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) AS m LIMIT 2 (Command)

...

[09:32:25 INF IDENTITY] Executed DbCommand (2ms) [Parameters=[@__removeTimestamp_0='?' (DbType = DateTime), @__p_1='?' (DbType = Int32)], CommandType='Text', CommandTimeout='30'] SELECT i.id, i.consumed, i.consumer_id, i.delivered, i.expiration_time, i.last_sequence_number, i.lock_id, i.message_id, i.receive_count, i.received, i.row_version FROM identity.inbox_state AS i WHERE i.delivered IS NOT NULL AND i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

...

// subscriptions working fine and using same inbox/outbox

All messages from IDENTITY application are still saved correctly into "identity.outbox_state" however they are never delivered to broker because IDENTITY checks "subscriptions.outbox_state" for some reason.

This is weird because MT / DbContext / everything is in separate DI for each application so it looks like bug to me (or I am missing some configuration?). It looks like some part of outbox configuration is cached / saved in static memory instead being scoped to DI container?

<PackageVersion Include="MassTransit" Version="8.1.3" />
<PackageVersion Include="MassTransit.Abstractions" Version="8.1.3" />
<PackageVersion Include="MassTransit.EntityFrameworkCore" Version="8.1.3" />
<PackageVersion Include="MassTransit.RabbitMQ" Version="8.1.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.1" />

Using .net core 8.


Solution

  • https://github.com/MassTransit/MassTransit/issues/5125

    When using multiple DbContexts schema caching must be disabled:

     busConfigurator.AddEntityFrameworkOutbox<TDbContext>(o =>
                {
                    // instead of o.UsePostgres();
                    o.LockStatementProvider = new PostgresLockStatementProvider(false);
    
                    o.UseBusOutbox();
                });