We use SQL Service Broker queues to notify our application of new records that meet certain criteria being added to a table in another application's database. This is accomplished by having an after insert
trigger run a query with for xml
against the inserted
virtual table and insert any results into a specific service broker queue. We then have a Notifier
object that receives from the service broker queue and invokes a callback for each message received. Our code to receive from the Service Broker queue is as follows:
let receiveXmlMessage connection transaction (cancellation: CancellationToken) queueName messageTypeName =
task {
let commandTimeout = if cancellation.IsCancellationRequested then 1 else 0
let receiveQuery =
sprintf """WAITFOR
(
RECEIVE TOP(1)
@message = CONVERT(xml, message_body),
@messageType = message_type_name,
@dialogId = conversation_handle
FROM dbo.[%s]
), TIMEOUT 60000;""" (sanitize queueName)
use receiveCommand =
match transaction with
| Some tx -> new SqlCommand(receiveQuery, connection, tx, CommandTimeout = commandTimeout)
| None -> new SqlCommand(receiveQuery, connection, CommandTimeout = commandTimeout)
receiveCommand.Parameters.AddRange([| SqlParameter("@message", SqlDbType.Xml, Direction = ParameterDirection.Output);
SqlParameter("@messageType", SqlDbType.NVarChar, Direction = ParameterDirection.Output, Size = 256);
SqlParameter("@dialogId", SqlDbType.UniqueIdentifier, Direction = ParameterDirection.Output); |])
try
let! receiveResult = receiveCommand.ExecuteNonQueryAsync(if commandTimeout = 0 then cancellation else CancellationToken.None)
if receiveResult > 0
then let messageType = receiveCommand.Parameters.["@messageType"].Value |> unbox<string>
let dialogId = receiveCommand.Parameters.["@dialogId"].Value |> unbox<Guid>
if messageType = messageTypeName
then do! endConversation connection transaction dialogId
return receiveCommand.Parameters.["@message"].Value |> unbox<string> |> XDocument.Parse
else return XDocument()
else return XDocument()
with | ex ->
log.errorxf ex "Failed to receive message from Service Broker Queue %s" queueName
return! Task.FromException ex
}
This was working fine for several months, processing millions of messages, until a few days ago, when we had another process cause extensive blocking on the database we monitor and our DBAs had to terminate several database sessions to relieve the contention. Ever since this incident, our application has encountered the following error when attempting to receive from the Service Broker queue:
2018-01-11 07:50:27.183-05:00 [31] ERROR - Failed to receive message from Service Broker Queue Notifier_Queue
System.Data.SqlClient.SqlException (0x80131904): A severe error occurred on the current command. The results, if any, should be discarded.
Operation cancelled by user.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader()
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Application.Common.Sql.ServiceBroker.receiveXmlMessage@257-3.Invoke(Unit unitVar0)
at Application.Common.TaskBuilder.tryWith[a](FSharpFunc`2 step, FSharpFunc`2 catch)
New messages are successfully added to the queue, and we are able to receive messages from the same queue using SSMS, or even an F# interactive session running as the same user as the application. It only appears to be our application that is affected, but it does seem to affect all instances of our application, on different servers, as long as they are connected to this specific database. We have tried restarting the application and the SQL Server, and we have tried running an ALTER DATABASE ... SET NEW_BROKER WITH ROLLBACK IMMEDIATE
. Nothing we've tried has made a difference, we still end up encountering the same exception, and we have hundreds of thousands of conversations remaining the CONVERSING
status, since our code to call END CONVERSATION
is only invoked after successfully receiving a message.
Our SQL Service Broker queues are setup to model the monologue pattern as described in this blog post.
How can we diagnose the cause of this rather non-specific exception that our application is returning from SQL Server? Is there anything else we could try to diagnose and/or correct whatever changed between our application and SQL Service Broker when the problem first occurred?
We finally figured out what was causing this error when we tried to receive from the Service Broker queue. As it turns out, the CancellationToken
that is passed in to our receiveXmlMessage
function was being cancelled by other logic in our application that monitors the number of conversing
conversations and attempts to recreate our Notifier
object if the number of conversing
conversations exceeds a certain threshold and the most recently closed conversation is older than another threshold. Due to a bug in the logic for the age of the most recently closed conversation, effectively only the number of conversing
conversations was being used to reset the Notifier
, and when the DB blocking occurred last week, over 150,000 conversing
conversations accumulated. This caused our application to continually cancel the CancellationToken
while we were trying to receive messages from Service Broker. Once we took our application down, cleaned up all the conversing
conversations, and fixed the bug in the date math for the last closed conversation, the error stopped occurring.
It may be useful to note, for anyone encountering the message:
A severe error occurred on the current command. The results, if any, should be discarded.
That this can be the result of the CancellationToken
passed to the ExecuteNonQueryAsync
/ExecuteReaderAsync
/ExecuteScalarAsync
/ExecuteXmlReaderAsync
method on the SqlCommand
being cancelled while the method is executing.