I'm having trouble getting MassTransit (v 7.3.1) saga with scheduled events to work using ActiveMQ Artemis 2.19.0 running in a Docker container. Everything works without any exception, but the publishing of the event is not delayed no matter what delay I specify.
I've tried reproducing the same use case using the test harness, but here everything works as expected.
UPDATED I've added debug logging for MassTransit, to get a better idea of the timeline.
Here is my test code, with one green test using the test harness, and one failing test using ActiveMQ.
using Automatonymous;
using MassTransit;
using MassTransit.ActiveMqTransport;
using MassTransit.Saga.InMemoryRepository;
using MassTransit.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using System;
using System.Threading.Tasks;
namespace Tests
{
public static class Config
{
public const int scheduleDelayMillis = 2000;
public const int testScheduleSleepFactor = 5;
}
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IScheduledSagaEvent : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ScheduledSagaEventTokenId { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Schedule(() => ScheduledSagaEvent, instance => instance.ScheduledSagaEventTokenId, s =>
{
s.Delay = TimeSpan.FromMilliseconds(Config.scheduleDelayMillis);
s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
});
Initially(
When(InitialSagaEvent)
.Schedule(ScheduledSagaEvent, context => context.Init<IScheduledSagaEvent>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingScheduledEvent)
);
During(AwaitingScheduledEvent,
When(ScheduledSagaEvent.Received)
.TransitionTo(ScheduledEventReceived)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public State AwaitingScheduledEvent { get; private set; }
public State ScheduledEventReceived { get; private set; }
public Schedule<MySaga, IScheduledSagaEvent> ScheduledSagaEvent { get; private set; }
}
public class SagaTests
{
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_TestHarness()
{
var provider = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(cfg =>
{
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
cfg.AddSagaStateMachineTestHarness<MyStateMachine, MySaga>();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var harness = provider.GetRequiredService<InMemoryTestHarness>();
var logger = provider.GetRequiredService<ILogger<SagaTests>>();
harness.OnConfigureInMemoryBus += configurator =>
{
configurator.UseDelayedMessageScheduler();
};
await harness.Start();
try
{
var sagaId = NewId.NextGuid();
var sagaHarness = provider.GetRequiredService<IStateMachineSagaTestHarness<MySaga, MyStateMachine>>();
await harness.Bus.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent
// (Using Thread.Sleep to make this test use the same logic as the ActiveMq test)
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
var saga = sagaHarness.Created.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
saga = sagaHarness.Sagas.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await harness.Stop();
await provider.DisposeAsync();
}
}
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq()
{
var provider = new ServiceCollection()
.AddMassTransit(cfg =>
{
cfg.AddDelayedMessageScheduler();
cfg.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.EnableArtemisCompatibility();
config.UseDelayedMessageScheduler();
config.ConfigureEndpoints(context);
});
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
try
{
var sagaId = NewId.NextGuid();
var sagaRepo = provider.GetRequiredService<IndexedSagaDictionary<MySaga>>();
await busControl.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Must wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent using ActiveMq
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
Assert.That(sagaRepo.Count, Is.EqualTo(1));
var sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
var saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent))); // <-- Line 166 that fails
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await busControl.StopAsync();
await provider.DisposeAsync();
}
}
}
}
The test Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq
fails at line 166:
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
This is where I want to verify that the scheduled event has not been delivered to the saga yet.
Here is the log from the test
Message:
String lengths are both 22. Strings differ at index 0.
Expected: "AwaitingScheduledEvent"
But was: "ScheduledEventReceived"
-----------^
Stack Trace:
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 168
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 187
GenericAdapter`1.BlockUntilCompleted()
NoMessagePumpStrategy.WaitForCompletion(AwaitAdapter awaiter)
AsyncToSyncAdapter.Await(Func`1 invoke)
TestMethodCommand.RunTestMethod(TestExecutionContext context)
TestMethodCommand.Execute(TestExecutionContext context)
<>c__DisplayClass4_0.<PerformWork>b__0()
<>c__DisplayClass1_0`1.<DoIsolated>b__0(Object _)
Standard Output:
15:19:07.714 info: MassTransit[0]
Configured endpoint My, Saga: Tests.MySaga, State Machine: Tests.MyStateMachine
15:19:07.802 dbug: MassTransit[0]
Starting bus: activemq://artemis:61616/
15:19:07.823 dbug: MassTransit[0]
Connect: admin@artemis:61616
15:19:08.003 dbug: MassTransit[0]
Connected: admin@artemis:61616 (client-id: ID:LNOR012579-63177-637823531478654470-0:0, version: 1.8.0.0)
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.026 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.027 dbug: MassTransit[0]
Get queue name: My, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.039 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: My?consumer.prefetchSize=24
15:19:08.041 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent?consumer.prefetchSize=24
15:19:08.042 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent?consumer.prefetchSize=24
15:19:08.044 dbug: MassTransit[0]
Consumers Ready: activemq://artemis:61616/My
15:19:08.044 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/My
15:19:08.048 info: MassTransit[0]
Bus started: activemq://artemis:61616/
15:19:08.096 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.158 dbug: MassTransit[0]
SEND activemq://artemis:61616/VirtualTopic.Tests.IInitialSagaEvent?type=topic 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent
15:19:08.207 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Created Tests.IInitialSagaEvent
15:19:08.213 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Added Tests.IInitialSagaEvent
15:19:08.231 dbug: MassTransit.ReceiveTransport[0]
Create send transport: activemq://artemis:61616/My
15:19:08.233 dbug: MassTransit.ReceiveTransport[0]
Get queue name: My, durable
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
15:19:08.610 dbug: MassTransit[0]
Stopping bus: activemq://artemis:61616/
15:19:08.615 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/My
15:19:08.615 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/My
15:19:08.624 dbug: MassTransit[0]
Stopping send transport: My
15:19:08.625 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/My
15:19:08.625 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Consumer completed activemq://artemis:61616/My: 2 received, 1 concurrent
15:19:08.625 dbug: MassTransit[0]
Stopping send transport: VirtualTopic.Tests.IInitialSagaEvent
15:19:08.626 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.638 dbug: MassTransit[0]
Disconnect: admin@artemis:61616
15:19:08.644 dbug: MassTransit[0]
Disconnected: admin@artemis:61616
15:19:08.645 info: MassTransit[0]
Bus stopped: activemq://artemis:61616/
From the log it seems like the message has already gets delivered to the saga 10 ms after it is sent?
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
I think I might be missing some configuration piece somewhere, but have ran out of ideas, so any hint is deeply appreciated.
Behind the scenes MassTransit uses the OpenWire protocol to communicate with ActiveMQ Artemis. Currently ActiveMQ Artemis doesn't support the AMQ_SCHEDULED_DELAY
header used by such OpenWire clients. I have opened ARTEMIS-3711 to deal with this and sent a pull request. This should be fixed in ActiveMQ Artemis 2.21.0 which should go up for a vote near the end of this month.