Search code examples
sql-serversql-server-2008service-broker

SQL Message Broker leaving messages in Sending Queue


We're putting messages into a SQL Server message queue, by a trigger on a table. (When a field is updated, we build some XML, and call the trigger below).

CREATE PROCEDURE [dbo].[up_CarePay_BrokerSendXml] 
    -- Add the parameters for the stored procedure here
    @Data VARCHAR(MAX) 

AS
BEGIN

    DECLARE @InitDlgHandle UNIQUEIDENTIFIER
    DECLARE @RequestMessage VARCHAR(1000) 
    BEGIN TRY
          BEGIN TRAN

                BEGIN DIALOG CONVERSATION @InitDlgHandle 
                FROM SERVICE [//IcmsCarePay/Service/Initiator]
                TO SERVICE N'//IcmsCarePay/Service/Target'
                ON CONTRACT [//IcmsCarePay/Contract]
                WITH ENCRYPTION = OFF;

                SEND ON CONVERSATION @InitDlgHandle
                MESSAGE TYPE [//IcmsCarePay/Message/Request] (@Data);

          COMMIT TRAN;
    END TRY
    BEGIN CATCH
          ROLLBACK TRAN;
          DECLARE @Message VARCHAR(MAX);
          SELECT @Message = ERROR_MESSAGE();
          PRINT @Message
    END CATCH;

END

This works. A message is placed in the queue.

The message is then sent to the receiving queue on the same server - different database. We then run a proc every minute, which grabs the message from the target queue, and processes it into a staging table for processing. The message is then out of the target queue, and this all works without error.

However...

When I check the initiaitor queue, where the message came from, it's filling up with message.

SELECT TOP 1000 *, casted_message_body = 
CASE message_type_name WHEN 'X' 
  THEN CAST(message_body AS NVARCHAR(MAX)) 
  ELSE message_body 
END 
FROM [ICMS].[dbo].[IcmsCarePayInitiatorQueue] WITH(NOLOCK)

I'd have thought that when the message went from the initiator, to the target, the initiator would disappear. But it seems to be filling up.

I note that the messages in the initiator have a 'message_type_id' of 2, a 'validation' of 'E' and message body and casted message body are NULL. There all have a message_type_name of 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'.

On the destination database side, here is the procedure used to get the messages from the queue:

CREATE PROCEDURE [dbo].[up_CarePayBrokerReceiveXml]   
AS
BEGIN  
  SET NOCOUNT ON;  

  DECLARE @XML XML, @Response XML = 'OK', @ConversationHandle UNIQUEIDENTIFIER, @message_type_name SYSNAME, @message_body VARBINARY(MAX), @source_table VARCHAR(100)
  DECLARE @Message VARCHAR(MAX), @Line INT, @Proc VARCHAR(MAX), @Exception VARCHAR(MAX)  

  WHILE ( 1 = 1 )
  BEGIN  
    -- Clear variables, as they may have been populated in previous loop.
    SET @message_type_name = NULL
    SET @message_body = NULL
    SET @ConversationHandle = NULL  
    SET @source_table = NULL

    BEGIN TRY 
      BEGIN TRAN

        WAITFOR (    -- Pop off a message at a time, and add to storage table.
           RECEIVE TOP (1) 
               @message_type_name = message_type_name  
             , @message_body = message_body  
             , @ConversationHandle = conversation_handle  
             , @source_table = CAST([message_body] AS XML).value('(/row/@SourceTable)[1]', 'varchar(50)')  
           FROM dbo.IcmsCarePayTargetQueue  
        ), TIMEOUT 3000;  

        IF @@ROWCOUNT = 0
        BEGIN  
          ROLLBACK  -- Complete the Transaction (Rollback, as opposeed to Commit, as there is nothing to commit).
          BREAK  
        END

        -- Code removed for example, but the fields are saved to a staging table in the database here...

         -- Respond to Initiator  
        SEND ON CONVERSATION @ConversationHandle MESSAGE TYPE [//IcmsCarePay/Message/Response](@Response);  
        END CONVERSATION @ConversationHandle;  

      COMMIT -- End of Transaction

    END TRY
    BEGIN CATCH
      -- End the conversation
      END CONVERSATION @ConversationHandle WITH CLEANUP  

      -- Get details about the issue.
      SELECT  @Exception = ERROR_MESSAGE(), @Line = ERROR_LINE(), @Proc = ERROR_PROCEDURE(), @Message = 'proc: ' + @Proc + '; line: ' + CAST(@Line AS VARCHAR) + '; msg: ' + @Exception  
      SELECT  @Message -- Displays on Concole when debugging.

      -- Log the issue to the Application Log.
      INSERT  INTO dbo.ApplicationLog
              ( LogDate ,
                Thread ,
                Level ,
                Logger ,
                Message ,
                Exception  
              )
      VALUES  ( GETDATE() , -- LogDate - datetime  
                'None' , -- Thread - varchar(255)  
                'FATAL' , -- Level - varchar(50)  
                '____up_CarePayBrokerReceiveXml' , -- Logger - varchar(255)  
                @Message , -- Message - varchar(4000)  
                @Exception  -- Exception - varchar(2000)  
              )  
      COMMIT -- We have stored the erronous message, and popped it off the queue. Commit these changes.
    END CATCH 
  END  -- end while  

END

Why are these messages staying there?

Details of a message that remain in the Initiator queue are:

Status: 1
Priority: 5
queuing_order: 395
mess_sequence_number: 0
service_name: //IcmsCarePay/Service/Initiator
service_contract_name: //IcmsCarePay/Contract
message_type_name: http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog
message_type_id: 2
validation: E
message_body: NULL
casted_message_body: NULL

Solution

  • Looks like you use one-time dialogs for these conversations. Your target stored proc retrieves messages from target queue and then closes their dialogs, but you don't handle it on the initiator queue.

    Since dialog is a distributed thing, in order to be closed, it has to be closed on both the initiator and the target sides. When your target proc issues end conversation @Handle; on the target, Service Broker sends the message of the type you mentioned to the initiator, to inform it that this particular dialog is history.

    Being done properly, the initiator activation procedure will receive this message, issue the corresponding end conversation on its side, and the dialog is closed.

    As you do not process any messages on the initiator side, these system messages accumulate there.

    2 solutions here are possible:

    1. Handle EndDialog messages. This actually should be done on both sides, because dialog can be closed on its either side.
    2. Re-use dialogs, so that you don't have to create a new one each time you need to send something. It will save some significant resources, especially if the traffic is thick enough.

    Note that #1 should be done regardless of whether you will use persistent or one-time dialogs.

    EDIT: Here is an example of the default processing procedure, taken from one of my projects:

    create procedure [dbo].[ssb_Queue_DefaultProcessor]
    (
        @Handle uniqueidentifier,
        @MessageType sysname,
        @Body xml,
        @ProcId int
    ) with execute as owner as
    
    set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
    set numeric_roundabort, xact_abort, implicit_transactions off;
    
    declare @Error int, @ErrorMessage nvarchar(2048);
    
    declare @Action varchar(20);
    
    begin try
    
    -- System stuff
    if @MessageType in (
        N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
        N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
        ) begin
    
        -- Depending on the actual message, action type will be different
        if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin
            set @Action = 'PURGE';
        end else if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
            set @Action = 'CLOSE';
    
        -- Close the dialog
        exec dbo.ssb_DialogPools_Maintain @Action = @Action, @DialogHandle = @Handle, @Error = @Error output, @ErrorMessage = @ErrorMessage output;
    
        if nullif(@Error, 0) is not null
            throw 50000, @ErrorMessage, 1;
    
    end else
        -- Some unknown messages may end up here, log them
        throw 50011, 'Unknown message type has been passed into default processor.', 1;
    
    end try
    begin catch
    
    if nullif(@Error, 0) is null
        select @Error = error_number(), @ErrorMessage = error_message();
    
    -- Don't try to resend messages from default processing
    exec dbo.ssb_Poison_Log @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage, @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId;
    
    end catch;
    return;
    

    It is called from all activation procs when they encounter any type of message other than what they are supposed to handle. Below is an example of one of such activation procedures:

    create procedure [dbo].[ssb_QProcessor_Clients]
    with execute as owner as
    
    
    set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
    set numeric_roundabort, xact_abort, implicit_transactions off;
    
    declare @Handle uniqueidentifier, @MessageType sysname, @Body xml, @MessageTypeId int;
    declare @Error int, @ErrorMessage nvarchar(2048), @ProcId int = @@procid;
    declare @TS datetime2(4), @Diff int, @Delay datetime;
    
    
    -- Fast entry check for queue contents
    if not exists (select 0 from dbo.ssb_OY_Clients with (nolock))
        return;
    
    while exists (select 0 from sys.service_queues where name = 'ssb_OY_Clients' and is_receive_enabled = 1) begin
    
        begin try
        begin tran;
    
        -- Receive something, if any
        waitfor (
            receive top (1) @Handle = conversation_handle,
                @MessageType = message_type_name,
                @Body = message_body
            from dbo.ssb_OY_Clients
        ), timeout 3000;
    
        if @Handle is null begin
    
            -- Empty, get out
            rollback;
            break;
    
        end;
    
        -- Check for allowed message type
        select @MessageTypeId = mt.Id
        from dbo.ExportMessageTypes mt
            inner join dbo.ExportSystems xs on xs.Id = mt.ExportSystemId
        where mt.MessageTypeName = @MessageType
            and xs.Name = N'AUDIT.OY.Clients';
    
        if @MessageTypeId is not null begin
    
            -- Store the data
            exec dbo.log_Clients @MessageType = @MessageType, @Body = @Body, @Error = @Error output, @ErrorMessage = @ErrorMessage output;
    
            -- Check the result
            if nullif(@Error, 0) is not null
                throw 50000, @ErrorMessage, 1;
    
        end else
            -- Put it into default processor
            exec dbo.ssb_Queue_DefaultProcessor @Handle = @Handle, @MessageType = @MessageType, @Body = @Body, @ProcId = @ProcId;
    
        commit;
        end try
        begin catch
    
        if nullif(@Error, 0) is null
            select @Error = error_number(), @ErrorMessage = error_message();
    
        -- Check commitability of the transaction
        if xact_state() = -1
            rollback;
        else if xact_state() = 1
            commit;
    
        -- Try to resend the message again
        exec dbo.[ssb_Poison_Retry] @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId, @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage;
    
        end catch;
    
        -- Reset dialog handle
        select @Handle = null, @Error = null, @ErrorMessage = null;
    
    end;
    
    -- Done!
    return;
    

    Of course, it's a bit more in this example than you might need, but I hope the general approach is apparent. And you need to handle EndDialog and Error message types on both initiator and target, because you never know where they will appear.