Search code examples
sql-server-2012service-broker

Can't Close Service Broker Conversations


I am having a problem closing Service Broker conversations and I would like to understand conceptually how things should work. Note that I based my code on Dan Guzman's example at http://www.dbdelta.com/service-broker-external-activator-example/.

Given the following SQL Server 2012 Service Broker configuration:

  • Initiator
  • Initiator Service - internally activated
  • Target
  • Target queue monitor
  • External Activator service (an SSIS app that communicates to a REST service)

This is my understanding of the process flow:

  1. Initiator sends message to Target.
  2. Target queue notification event fires.
  3. External Activator service executes in response to Target queue notification event; issues RECEIVE TOP (1) to retrieve message sent to Target.
  4. External Activator service finishes processing and executes END CONVERSATION @ConversationHandle.
  5. Step 4's END CONVERSATION results in a close message to Initiator; Initiator's internally-activated Service runs in response to the appearance of a message in the Initiator's queue, does its processing and issues an END CONVERSATION @ConversationHandle.

Unfortunately, the Initiator side of the conversation closes but the Target side never does; it is left in a state of DISCONNECTED_INBOUND.

Wouldn't the Target side of the conversation close when the External Activator service issues the END CONVERSATION @ConversationHandle? If not, how do you close the Target side of the conversation?

        -- Enabling service broker
        USE master
        ALTER DATABASE my_database
        SET ENABLE_BROKER; 

        --Create Message Types for Request and Response messages

        -- For Request
        CREATE MESSAGE TYPE
        [ABCEventRequestMessage]
        VALIDATION=WELL_FORMED_XML; 

        -- For Response
        CREATE MESSAGE TYPE
        [ABCEventResponseMessage]
        VALIDATION=WELL_FORMED_XML; 

        --Create Contract for the Conversation 

        CREATE CONTRACT [ABCEventContract]
        (
            [ABCEventRequestMessage] SENT BY INITIATOR 
            -- must make the reply message 'SENT BY ANY'
            -- so the External Activator service can
            -- send it to the Initiator after talking
            -- to the webservice
            ,[ABCEventResponseMessage] SENT BY ANY
        );

        --Create Queue for the Initiator
        CREATE QUEUE ABCEventInitiatorQueue
        WITH STATUS = ON,
        ACTIVATION (
          PROCEDURE_NAME = dbo.pci_LogABCEventTransferResult,
          MAX_QUEUE_READERS = 1,
          EXECUTE AS SELF
        ); 

        --Create Queue for the Target
        CREATE QUEUE ABCEventTargetQueue; 

        --Create Queue for the External Activator
       CREATE QUEUE ExternalActivatorQueue;

        --Create Service for the Target and the Initiator.

        --Create Service for the Initiator.
        -- NOTE: not specifying a contract on the queue
        -- means that the service can initiate conversations
        -- but it can't be a target for any other services
        CREATE SERVICE [ABCEventInitiatorService]
        ON QUEUE ABCEventInitiatorQueue;

        --Create Service for the Target.
        CREATE SERVICE [ABCEventTargetService] 
        ON QUEUE ABCEventTargetQueue
        ([ABCEventContract]); 

        --Create Service for the External Activator
        CREATE SERVICE ExternalActivatorService
        ON QUEUE ExternalActivatorQueue
        (
          [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification]
        );

      CREATE EVENT NOTIFICATION EventNotificationABCEventTargetQueue
          ON QUEUE ABCEventTargetQueue
          FOR QUEUE_ACTIVATION
          TO SERVICE 'ExternalActivatorService', 'current database';
    });

        IF OBJECT_ID('dbo.pci_InitiateABCEventTransfer', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_InitiateABCEventTransfer as SELECT 1')
        END;

        ALTER PROC dbo.pci_InitiateABCEventTransfer

        @CompleteTriggerXMLOut XML = NULL OUTPUT
        ,@ConversationHandle uniqueidentifier = NULL OUTPUT
        ,@CompleteTriggerXMLIn XML

        ---------------------------------------------
        --called by application to trigger batch process
        --Sample Usage:
        --
        -- EXEC dbo.pci_InitiateABCEventTransfer @@CompleteTriggerXML = 'your_xml_here';
        -- NOTE: when calling this stored procedure from a SQL Server Mgmt
        -- Studio query window, enclose the xml data in single quotes;
        -- if you use double quotes instead, SQL Server thinks
        -- you are specifying a field name in which to save the data
        -- and returns an error like 'Maximum length is 128'
        ---------------------------------------------
        AS
        DECLARE
        --@conv_hand uniqueidentifier
        @message_body varbinary(MAX);

        SET @CompleteTriggerXMLOut = @CompleteTriggerXMLin

        BEGIN TRY

        BEGIN TRAN;

        BEGIN DIALOG CONVERSATION @ConversationHandle
        FROM SERVICE ABCEventInitiatorService
        TO SERVICE 'ABCEventTargetService', 'CURRENT DATABASE'
        ON CONTRACT ABCEventContract
        WITH
        ENCRYPTION = OFF,
        LIFETIME = 6000;

        -- NOTE: because we created our services with a specific
        -- contract, ABCEventContract, that includes specific
        -- message types (ABCEventRequestMessage & ABCEventResponseMessage)
        -- but does not include the DEFAULT message type,
        -- we must include the MESSAGE TYPE clause
        -- of the SEND ON CONVERSATION command; without this clause,
        -- Service Broker will assume the DEFAULT message type
        -- and the SEND will fail, saying, "The message TYPE 'DEFAULT'
        -- is not part of the service contract."

        SEND ON CONVERSATION @ConversationHandle
        MESSAGE TYPE ABCEventRequestMessage
        (@CompleteTriggerXMLOut);

        COMMIT;
        END TRY
        BEGIN CATCH
        THROW;
        END CATCH;

        SELECT @CompleteTriggerXMLOut, @ConversationHandle;

        PRINT 'CompleteTriggerXMLOut = ' + CONVERT(nvarchar(max), @CompleteTriggerXMLOut);

        RETURN @@ERROR; 

        IF OBJECT_ID('dbo.pci_GetABCEventDetails', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_GetABCEventDetails as SELECT 1')
        END;

        ALTER PROC dbo.pci_GetABCEventDetails
        --------------------------------------
        --called by SSIS package at start ---
        --------------------------------------
        AS
        DECLARE
        @conversation_handle uniqueidentifier
        ,@message_type_name sysname
        ,@message_body xml
        ,@parameter1 int;

        BEGIN TRY

        BEGIN TRAN;

        RECEIVE TOP(1)
        @conversation_handle = conversation_handle
        ,@message_type_name = message_type_name
        ,@message_body = message_body
        FROM dbo.ABCEventTargetQueue;

        IF @@ROWCOUNT = 0
        BEGIN
        RAISERROR ('No messages received from dbo.ABCEventTargetQueue', 16, 1);
        RETURN 1;
        END;

        INSERT INTO dbo.ABCEventTransferLog(
        ConversationHandle
        ,MessageTypeName
        ,MessageBody
        )
        VALUES(
        @conversation_handle
        ,@message_type_name
        ,CAST(@message_body AS varbinary(MAX))
        );

        COMMIT;

        SELECT
            CAST(@message_body AS nvarchar(MAX)) AS WhatIsThis
            ,@conversation_handle AS ConversationHandle
            ,@parameter1 AS Parameter1;

        END TRY
        BEGIN CATCH
        THROW;
        END CATCH;

        RETURN @@ERROR;

        IF OBJECT_ID('dbo.pci_CompleteABCEventTransfer', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_CompleteABCEventTransfer as SELECT 1')
        END;

            ALTER PROC dbo.pci_CompleteABCEventTransfer
          @ConversationHandle uniqueidentifier
                ,@WebserviceResponseStatusCode integer
                ,@WebserviceResponseXML xml
           ------------------------------------------
           -- called by SSIS package at completion
           -- Sample Usage:

           -- normal completion:
           -- EXEC dbo.pci_CompleteABCEventTransfer
           -- @ConversationHandle = '00000000-0000-0000-0000-000000000000';

           -- completed with error:
           -- EXEC dbo.pci_CompleteABCEventTransfer
           -- @ConversationHandle = '00000000-0000-0000-0000-000000000000'
           -- @ErrorMessage = 'an error occurred';
           ------------------------------------------
         AS

         IF @WebserviceResponseStatusCode <> 201
               -- webservice record creation failed;
               -- complete conversation with error
         BEGIN
           END CONVERSATION @ConversationHandle
           WITH ERROR = 1
           DESCRIPTION = 'Something went horribly wrong';
         END;
               -- webservice created record in remote system;
               -- complete conversation normally
               ELSE
         BEGIN
           END CONVERSATION @ConversationHandle;
         END

         RETURN @@ERROR;

  # because of circular references, must create a "dummy"
  # LogABCEventTransferResult stored procedure first,
  # create the ABCEventInitiatorQueue and then replace
  # the dummy stored procedure with the real one

      IF OBJECT_ID('dbo.pci_LogABCEventTransferResult', 'P') IS NULL
      BEGIN
        EXEC ('CREATE PROCEDURE dbo.pci_LogABCEventTransferResult as SELECT 1')
      END;

      ALTER PROC dbo.pci_LogABCEventTransferResult
      ---------------------------------------------
      --initiator queue activated proc to process messages
      ---------------------------------------------
      AS
      DECLARE
      @conversation_handle uniqueidentifier
      ,@message_type_name sysname
      ,@message_body varbinary(MAX);
      WHILE 1 = 1
      BEGIN
      WAITFOR (
        RECEIVE TOP (1)
        @conversation_handle = conversation_handle
        ,@message_type_name = message_type_name
        ,@message_body = message_body
        FROM dbo.ABCEventInitiatorQueue
      ), TIMEOUT 1000;
      IF @@ROWCOUNT = 0
      BEGIN
      --exit when no more messages
      RETURN;
      END;

      --log message
      INSERT INTO dbo.ABCEventTransferLog(
        ConversationHandle
        ,MessageTypeName
        ,MessageBody
      )
      VALUES(
        @conversation_handle
        ,@message_type_name
        ,@message_body
      );
      END CONVERSATION @conversation_handle;
      END;

      --log table
      CREATE TABLE dbo.ABCEventTransferLog(
      ConversationHandle uniqueidentifier NOT NULL
      ,MessageTypeName sysname NOT NULL
      ,MessageBody varbinary(MAX) NULL
      ,LogTime datetime2(3) NOT NULL
      CONSTRAINT DF_ServiceBrokerLog_LogTime
      DEFAULT (SYSDATETIME())
      );
      CREATE CLUSTERED INDEX cdx_ABCEventTransferLog ON dbo.ABCEventTransferLog(LogTime);

Solution

  • There are two conversation involved here. One is your own app conversation, and one is the system conversation that notifies the EA. from how you describe your code, you relly on EA to close the conversation, but this will only close the system conversation. You need to close the app conversation as well, the handle on which you receive from your Target queue.

    Update:

    Your activated service should issue RECEIVE ... FROM Target and then END CONVERSATION on the @handle it received. Your explanation says that this is what you do (step 4), but if you would do so then the target conversation would clean up proper in 30 minutes.

    However you notice conversations in DISCONNECTED_INBOUND state. This is a conversation that has received an EndDialog message but the application did no issue an END CONVERSION on it.

    So there is a disconnect between what you explain the application is doing and the observed conversations state. I guess there could be a bug in the code, and the code does not do exactly what you think it does. Try posting here the relevant parts of the code.