I'm having a concurrency issue with MassTransit sagas.
I'm currently working on a POC with this flow:
I sometimes get exceptions like this Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded.
in step 4, and the state change is not persisted.
Here is the business logic code:
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public byte[] RowVersion { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Initially(
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(AwaitingExternalCheck,
Ignore(InitialSagaEvent),
When(ExternalCheckOk)
.TransitionTo(CheckedOk),
When(ExternalCheckNotOk)
.TransitionTo(CheckedNotOk)
);
During(CheckedOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(CheckedNotOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
public State AwaitingExternalCheck { get; private set; }
public State CheckedOk { get; private set; }
public State CheckedNotOk { get; private set; }
}
public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
private readonly IExternalChecker externalChecker;
public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
{
this.externalChecker = externalChecker;
}
public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
{
var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
if (ok)
{
await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
else
{
await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
}
}
public interface IExternalChecker
{
Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}
public class Publisher
{
private readonly IPublishEndpoint publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
{
await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
}
}
Here it the configuration code
public class MySagaDbContext : SagaDbContext
{
public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new MySagaClassMap();
}
}
}
public class MySagaClassMap : SagaClassMap<MySaga>
{
protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(128);
entity.Property(x => x.RowVersion).IsRowVersion();
}
}
public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
=> builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
m.MigrationsHistoryTable($"__EFMigrationsHistory_Sagas");
}));
services.AddMassTransit(configureMassTransit =>
{
configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
configureMassTransit.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
config.EnableArtemisCompatibility();
config.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var busControl = serviceProvider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
await RunPoc(serviceProvider);
}
private static async Task RunPoc(IServiceProvider serviceProvider)
{
await Task.CompletedTask;
}
static string connectionString = string.Empty;
}
My guess is that I need to get in a UseRetry at the correct point, so I've tried to configure the AddSagaStateMachine with UseRetry like this:
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
configure =>
{
configure.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
But with this UseRetry in AddSagaStateMachine nothing works, I just get loads of exception like this:
fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
I'm using .Net 6 and have tried MassTransit v 7.3.1 and v 8.0.0-develop.391, but both has the same behavior.
I've tried defining the messages as interfaces and publishing them both as anonymous classes and as actual implementations, and also tried to define the messages as classes, but with no luck.
My hope it that there is just some small configuration detail I'm missing, but I'm out of ideas, so any help is deeply appreciated.
The proper configuration in your SagaDefinition
is shown below. Note the use of UseMessageRetry
, instead of UseRetry
.
public class ExternalCheckRequestConsumerDefinition :
ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseMessageRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
The above Consumer definition isn't used by the saga. You'd need to create a Saga definition, and specify it when adding the saga, for the retry to apply to the saga. Which would do the same as configuring it inline when adding the saga:
.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(
Also, in your state machine, replace the overly noisy:
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
With:
.PublishAsync(context => context.Init<IExternalCheckRequest>(new { context.Instance.CorrelationId }))