We are using NServiceBus to propagate messages about data changes in one system to other systems in our environment.
When we get an exception in one of our client (receiving) systems we would like to stop processing messages without removing the failing message from the queue, so that we can investigate the issue and pick up from the same place in the queue.
What's the best way to accomplish this?
EDIT: Heres my current setup
Configure.With()
.DefiningCommandsAs(t => typeof(ICommand).IsAssignableFrom(t))
.DefiningEventsAs(t => typeof(IEvent).IsAssignableFrom(t))
.DefiningMessagesAs(t => typeof(IMessage).IsAssignableFrom(t))
.Log4Net<NlogAppenderForLog4Net>(a => { })
.NinjectBuilder(Kernel)
.XmlSerializer()
.MsmqTransport()
.DefineEndpointName("consumer.input")
.IsTransactional(true)
.PurgeOnStartup(false)
.MsmqSubscriptionStorage("consumer")
.UnicastBus()
.LoadMessageHandlers()
.ImpersonateSender(false)
.CreateBus().Start(() => Configure.Instance.ForInstallationOn<NServiceBus.Installation.Environments.Windows>().Install());
You'll probably want to implement the IManageMessageFailures interface. This interface gives you access to the exception and the message:
public interface IManageMessageFailures
{
void SerializationFailedForMessage(TransportMessage message, Exception e);
void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e);
}
If that fails, it will call whatever you specify in your configuration to be the action to take. This is done via DefineCriticalErrorAction(Action onCriticalError)
. The default is to put the number of worker threads to 0, allowing you to spin down the process. You may want to do that right in your fault manager. You should be able to tap into the transport layer to add the threads back.