When upgrading from Masstransit 7.3 to Masstransit 8.0.15, I found what seems like a breaking change and I would like to know if anyone else had this issue before.
I have a Saga that schedules a job restart each 30 seconds:
Schedule(() => RestartConversion, instance => instance.RestartConversionTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(30);
s.Received = r => r.CorrelateById(context => context.Message.ItemId);
});
However, in my unit test, previously in version 7.3 I was able to manually trigger this event by publishing a RestartConversionEvent
, which would trigger in When(RestartConversion.Received)
.
var endPoint = await Harness.Bus.GetSendEndpoint(Harness.InputQueueAddress);
await endPoint.Send(new DocumentConversionSaga.RestartConversionSchedule
{
ItemId = Guid.Parse(uploadDocumentInfo.ItemId),
ConversionJobInstanceId = conversionInstanceId,
PreviewJobInstanceId = previewInstanceId
});
This behavior does not seem to work anymore in Masstransit 8, because if I wait the 30 seconds, I can see that the RestartConversion
event is automatically sent.
Is there any workaround for this issue?
Harness Configuration:
.AddMassTransitInMemoryTestHarness(cfg =>
{
cfg.SetInMemorySagaRepositoryProvider();
cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
cfg.AddRequestClient<StartConversionCommand>();
})
Harness = provider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureBus += cfg =>
{
cfg.UseDelayedMessageScheduler();
BusTestFixture.ConfigureBusDiagnostics(cfg);
cfg.UseMessageData(new InMemoryMessageDataRepository());
cfg.UseNewtonsoftJsonSerializer();
};
I tried to change the in memory test harness to use the following, but the result is exactly the same.
.AddMassTransitTestHarness(cfg =>
{
cfg.SetInMemorySagaRepositoryProvider();
cfg.AddSagaStateMachine<DocumentConversionSaga, DocumentConversionState>().InMemoryRepository();
cfg.AddRequestClient<StartConversionCommand>();
cfg.UsingInMemory((ctx, x) =>
{
x.UseDelayedMessageScheduler();
BusTestFixture.ConfigureBusDiagnostics(x);
x.UseMessageData(new InMemoryMessageDataRepository());
x.UseNewtonsoftJsonSerializer();
x.ConfigureEndpoints(ctx);
});
})
You should always review the logs to understand exactly what's happening under the covers, but if I had to guess, you are sending an event to the saga that was not expected. If the saga didn't schedule the message (and keep the scheduled message tokenId in the saga instance), it will ignore it as not being current. This is to avoid the "I unscheduled the message but it arrived anyway" scenario that is a race condition between the scheduled message and the pending cancellation by the message scheduler.
For it to work, you'd likely have to store the tokenId in the saga instance and send your "scheduled" event including that tokenId.