I have two business domains (DomainA and DomainB) that use their own SQL Transport in separate databases (separate servers) for their NServiceBus enpdpoints.
I had the understanding that you could send / publish messages from one domain to another by connecting them via the router. Official documentation makes it clear that it is possible. For reproduction purposes, I took the update and publish sample and modified it so that both endpoints use SQL Server transport + SQL Persistence.
The goal is to publish a message from DomainA and have it handled in DomainB.
DomainA (web app) - no router configuration since it only publishes something for the demo:
var endpointConfiguration = new EndpointConfiguration("DomainA-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainA);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainA);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
DomainB - connects to router for published event:
var endpointConfiguration = new EndpointConfiguration("DomainB-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainB);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainB);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
var routerConnector = transport.Routing().ConnectToRouter("DomainA-B-Router");
routerConnector.RegisterPublisher(
eventType: typeof(OrderAccepted),
publisherEndpointName: "DomainA-Endpoint");
And the router:
var routerConfig = new RouterConfiguration("DomainA-B-Router");
var domainAInterface = routerConfig.AddInterface<SqlServerTransport>("DomainA", t =>
{
t.ConnectionString(ConnectionStrings.DomainA);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainASqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainA-", new SqlDialect.MsSqlServer(), null);
domainAInterface.EnableMessageDrivenPublishSubscribe(domainASqlSubscriptionStorage);
var domainBInterface = routerConfig.AddInterface<SqlServerTransport>("DomainB", t =>
{
t.ConnectionString(ConnectionStrings.DomainB);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainBSqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainB-", new SqlDialect.MsSqlServer(), null);
domainBInterface.EnableMessageDrivenPublishSubscribe(domainBSqlSubscriptionStorage);
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("DomainA", "DomainB");
staticRouting.AddForwardRoute("DomainB", "DomainA");
domainASqlSubscriptionStorage.Install().GetAwaiter().GetResult();
domainBSqlSubscriptionStorage.Install().GetAwaiter().GetResult();
routerConfig.AutoCreateQueues();
Code for starting the endpoints and the router are ommited for brievety.
Connection strings are as follow - use a different SQL instance for DomainA and DomainB:
public class ConnectionStrings
{
public const string DomainA = @"Data Source=(local);Initial Catalog=Nsb-DomainA-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string DomainB = @"Data Source=(localDB)\MSSQLLocalDB;Initial Catalog=Nsb-DomainB-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string Router = @"Data Source=(local);Initial Catalog=Nsb-DomainA-B-Router-DB;Integrated Security=True;Max Pool Size=100";
}
When I run the sample, I get the following error in the router:
2019-07-10 11:46:08.889 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router is now in the armed state
2019-07-10 11:46:15.955 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router will now be triggered
2019-07-10 11:46:15.991 ERROR ThrottlingRawEndpointConfig`1[[NServiceBus.SqlServerTransport, NServiceBus.Transport.SqlServer, Version=4.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Persistent error while processing messages in DomainA-B-Router. Entering throttled mode.
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to [Nsb-DomainA-Endpoint-DB].[dbo].[DomainA-Endpoint] ---> System.Data.SqlClient.SqlException: Invalid object name 'Nsb-DomainA-Endpoint-DB.dbo.DomainA-Endpoint'.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString, Boolean isInternal, Boolean forDescribeParameterEncryption, Boolean shouldCacheForAlwaysEncrypted)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader(Boolean isInternal, Boolean forDescribeParameterEncryption)
at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of inner exception stack trace ---
at NServiceBus.Transport.SQLServer.TableBasedQueue.ThrowQueueNotFoundException(SqlException ex)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchUsingReceiveTransaction>d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchAsNonIsolated>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.MessageDispatcher.<Dispatch>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PostroutingTerminator.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ForwardSubscribeMessageDrivenRule.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at SubscribePreroutingTerminator.<Terminate>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at StorageDrivenSubscriptionRule.<Invoke>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PreroutingToSubscribePreroutingFork.<Terminate>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ThrottlingRawEndpointConfig`1.<>c__DisplayClass1_0.<<PrepareConfig>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ReceiveStrategy.<TryProcessingMessage>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ProcessWithNativeTransaction.<TryProcess>d__3.MoveNext()
We can see that the router is trying to access the DomainA DB, and when debugging, we can see that it does so from the DomainB connection, which cannot work since they are on a different server. If I change the connection string of the DomainB to point to the same SQL Instance, everything works fine (provided the same user has access to all DBs).
I thought the role of the router was to move messages between the instances, but I am failing to achieve that. Am I doing something wrong?
Why should the router and the endpoints use the same connection for subscription data?
Thanks for your help!
Full code is available here.
I downloaded the code and upgraded to the latest version of Router (3.8.1) and it worked without any issues -- the message has been published and delivered to both subscribers.