Newbie question - what am I missing? Are there any dotnetcore 2.2 Saga examples available?
I have a basic end to end system working OK with messages flowing across containers in docker-compose, but adding a Saga seems to be a challenge -
Q. Am I missing a scheduler dependency? In MassTransit 5.5.5, cfg.UseInMemoryMessageScheduler(); doesn't compile.
Something odd was going on, I had to mark my state machine explicitly as ISaga
MassTransit.ConfigurationException: Failed to create the state machine connector for Model.WorkflowExecutionStateMachine ---> MassTransit.ConfigurationException: The state machine was not properly configured: workflowapi_1 | [Failure] ExecutingTask was not specified
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
// Register MassTransit
services.AddMassTransit(x =>
{
x.AddConsumer<WorkflowTaskConsumer>();
// required?
x.AddSaga<WorkflowExecutionSaga>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var rabbitMQHostName = $"rabbitmq://{configuration["RabbitMQHostName"]}";
Console.Out.WriteLineAsync($"Starting Workflow Receiver... {rabbitMQHostName}/{QueueNames.ExeuteWorkflowTaskQueue}");
var host = cfg.Host(new Uri(rabbitMQHostName), hostConfig =>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
// A basic message works OK
cfg.ReceiveEndpoint(host, QueueNames.ExeuteWorkflowTaskQueue, ep =>
{
ep.PrefetchCount = 1;
ep.UseMessageRetry(mr => mr.Interval(1000, 2));
ep.ConfigureConsumer<WorkflowTaskConsumer>(provider);
});
// Doesn't like this
cfg.ReceiveEndpoint(host, QueueNames.WorkflowStateMachineSagaQueueName, ep =>
{
ep.PrefetchCount = 1;
ep.UseMessageRetry(mr => mr.Interval(1000, 2));
ep.StateMachineSaga(new WorkflowExecutionSaga(), new InMemorySagaRepository<WorkflowExecutionStateMachine>());
});
}));
cfg.UseInMemoryMessageScheduler(); // doesn't compile!
});
}
Bus is started as follows -
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
// The default HSTS value is 30 days. You may want to change this for production scenarios,
// see https://aka.ms/aspnetcore-hsts.
app.UseHsts();
}
app.UseMvc();
var bus = app.ApplicationServices.GetService<IBusControl>();
var busHandle = TaskUtil.Await(() =>
{
return bus.StartAsync();
});
lifetime.ApplicationStopping.Register(() =>
{
busHandle.Stop();
});
}
Exception details are
Unhandled Exception: MassTransit.ConfigurationException: Failed to create the state machine connector for Rapid.Workflow.Api.Model.WorkflowExecutionStateMachine ---> MassTransit.ConfigurationException: The state machine was not properly configured:
workflowapi_1 | [Failure] ExecutingTask was not specified
workflowapi_1 | at Automatonymous.StateMachineConfigurationResult.CompileResults(IEnumerable1 results)
workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector
1.StateMachineEvents()+MoveNext()
workflowapi_1 | at System.Collections.Generic.List1.AddEnumerable(IEnumerable
1 enumerable)
workflowapi_1 | at System.Linq.Enumerable.ToList[TSource](IEnumerable1 source)
workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector
1..ctor(SagaStateMachine1 stateMachine)
workflowapi_1 | --- End of inner exception stack trace ---
workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector
1..ctor(SagaStateMachine1 stateMachine)
workflowapi_1 | at Automatonymous.SagaConfigurators.StateMachineSagaConfigurator
1..ctor(SagaStateMachine1 stateMachine, ISagaRepository
1 repository, ISagaConfigurationObserver observer)
workflowapi_1 | at MassTransit.AutomatonymousReceiveEndpointExtensions.StateMachineSaga[TInstance](IReceiveEndpointConfigurator configurator, SagaStateMachine1 stateMachine, ISagaRepository
1 repository, Action`1 configure)
workflowapi_1 | at Rapid.Workflow.Api.Startup.<>c.b__2_5(IRabbitMqReceiveEndpointConfigurator ep) in /src/Workflow.Api/Startup.cs:line 74
Dependencies are
<PackageReference Include="Automatonymous" Version="4.1.6" />
<PackageReference Include="MassTransit" Version="5.5.5" />
<PackageReference Include="MassTransit.RabbitMQ" Version="5.5.5" />
<PackageReference Include="MassTransit.AspNetCore" Version="5.5.5" />
<PackageReference Include="MassTransit.Automatonymous" Version="5.5.5" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="5.5.5" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
Thanks for any tips or ideas -
This error seems to crop up because the Saga class had declared some as-yet-unused (but public) events - DOH!
The solution was to remove unused events from the Saga...
// uncomment will fail! public Event<ISatelliteTaskRequest> UnusedEvent { get; private set; }
After looking at this sample https://github.com/selcukusta/masstransit-saga-implementation and cutting my program.cs back to basics - I was still getting the error! So, not a container / IOC / Startup issue.
Next, looking in source for the MassTransit error message (https://github.com/MassTransit/MassTransit/blob/master/src/MassTransit.AutomatonymousIntegration/Configuration/StateMachineConnectors/StateMachineConnector.cs) I realised that the correlating code is possibly reflecting on all public members of the Saga -
So, removing unused events from the Saga class fixes the issue.