Search code examples
sql-serverservice-broker

How to clean up after CREATE DIALOG CONVERSATION on disconnect?


---Edited to include more and better details---

In SQL Server, you can initialize a conversation on a queue:

https://msdn.microsoft.com/en-us/library/ms187377.aspx

Multiple processes will all initialize conversations, store the @dialog_handle in table [dbo].[ActiveConversations], and then WAITFOR a message to arrive (from a trigger on a table).

Here's the gist of what the code is doing (leaving out some irrelevant complications and error checking and such):

Starting the conversation via .NET SqlCommand:

DECLARE @LoginTime datetime;
DECLARE @handle uniqueidentifier;
SELECT TOP 1 @LoginTime = login_time FROM sys.sysprocesses WHERE spid = @@SPID;
BEGIN DIALOG @handle
   FROM SERVICE [myService]
   TO SERVICE 'myService'
   ON CONTRACT myContract
   WITH ENCRYPTION=OFF;

INSERT INTO [dbo].[ActiveConversations]
(
   ConversationHandle,
   SysProcessID,
   SysProcessLoginTime
)
VALUES
(
   @handle,
   @@SPID,
   @LoginTime
)

Ending the conversation via .NET SqlCommand:

DECLARE @handle uniqueidentifier;
SELECT TOP 1 @handle = ConversationHandle
   FROM [dbo].[ActiveConversations] AS conv
   INNER JOIN sys.sysprocesses as sysp
      ON conv.SysProcessLoginTime = sysp.login_time;

DELETE FROM [dbo].[ActiveConversations] WHERE ConversationHandle = @handle;
END CONVERSATION @handle;

Sending messages:

CREATE TRIGGER [dbo].[myTableChanged] ON [dbo].[myTable] AFTER UPDATE
AS
BEGIN
   DECLARE @handle uniqueidentifier;
   DECLARE curs CURSOR LOCAL STATIC READ_ONLY FORWARD_ONLY
   FOR
   SELECT ConversationHandle FROM [dbo].[ActiveConversations];

   OPEN curs
   FETCH NEXT FROM curs INTO @handle;
   WHILE @@FETCH_STATUS = 0
   BEGIN
      BEGIN TRY
         SEND ON CONVERSATION @handle
            MESSAGE TYPE [myType] ( '' );
      END TRY
      BEGIN CATCH
      END CATCH
      FETCH NEXT FROM curs INTO @ConversationHandle;
   END
   CLOSE curs;
   DEALLOCATE curs;
END

Waiting for messages:

DECLARE @handle uniqueidentifier;
SELECT @handle = [ConversationHandle]
    FROM [dbo].ActiveConversations] AS conv
    INNER JOIN sys.sysprocesses AS sysp
    ON sysp.spid = conv.SysProcessID
    AND sysp.login_time = conv.SysProcessLoginTime
    AND sysp.spid = @@SPID;

WAITFOR (RECEIVE * FROM [dbo].[myQueue] WHERE conversation_handle = @handle);

When each process exits, it is responsible for calling END CONVERSATION. However, if a process were to die an untimely death, it will never get a chance to call END CONVERSATION, and the conversation will sit out there forever. Or will it?

Will the conversation be cleaned up automatically, or how do you make sure dead conversations don't accumulate?

What's the best practice in this scenario? Is it to set a specific timeout and then re-initialize the conversation periodically? If the conversation ends unexpectedly, is there a way to find out so it can be deleted from the [dbo].[ActiveConversations] table?


Solution

  • There is no out-of-the-box mechanism to say "has the far side service received messages within some time span?". That said, we can use conversation timers to accomplish it. The gist of it is that the target service, once it gets signal that a new conversation is starting, begins a conversation timer and records that conversation's handle in a table (along with a time stamp). When that timer expires, the activation procedure checks the table to see when the last heartbeat message from the initiator service was processed. If it's outside of a predetermined tolerance, the conversation is ended. In parallel, the initiator service sends heartbeat messages on a cadence letting the target service know that it's still alive.

    Here's the setup code:

    use master;
    go
    if exists (select 1 from sys.databases where name = 'TimerInitiator')
    begin
        alter database [TimerInitiator] set offline with rollback immediate;
        alter database [TimerInitiator] set online;
        drop database [TimerInitiator];
    end
    
    if exists (select 1 from sys.databases where name = 'TimerTarget')
    begin
        alter database [TimerTarget] set offline with rollback immediate;
        alter database [TimerTarget] set online;
        drop database [TimerTarget];
    end
    
    create database [TimerInitiator];
    create database [TimerTarget];
    revert;
    go
    
    use [TimerInitiator];
    create master key encryption by password = 'f00bar!23';
    create user [BrokerUser] without login;
    CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
    create message type [Request] validation = none;
    create message type [Response] validation = none;
    create message type [Heartbeat] validation = none;
    create contract [ConversationContract] (
        [Request] sent by initiator,
        [Response] sent by target,
        [Heartbeat] sent by initiator
    );
    create queue [InitiatorQueue] ;
    create service [InitiatorService] 
        authorization [BrokerUser] 
        on queue [InitiatorQueue]
        ( [ConversationContract] );
    
    create remote service binding [TimerRSB]
        authorization [BrokerUser]
        to service 'TargetService'
        with user = [BrokerUser];
    
    declare @broker_instance uniqueidentifier;
    select @broker_instance = service_broker_guid
    from sys.databases
    where name = 'TimerTarget';
    
    declare @sql nvarchar(max);
    set @sql = concat(
    'create route [TimerTargetRoute]
    with service_name = ''TargetService'',
        broker_instance = ''', @broker_instance,
        ''', address = ''LOCAL''');
    exec(@sql)
    go
    
    go
    create procedure [InitiatorActivation]
    as
    begin
        declare @ch uniqueidentifier, @message_type sysname;
    
        while(1=1)
        begin
            waitfor (
                receive top(1) @ch = conversation_handle,
                    @message_type = message_type_name
                from [dbo].[InitiatorQueue]
            ), timeout 1000;
    
            if (@@ROWCOUNT = 0)
                break;
    
            --if (@message_type = 'Response')
            --begin
            --  -- do something to process the message
            --end
            --else 
            if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            begin
                end conversation @ch
            end
        end
    end
    go
    alter queue [dbo].[InitiatorQueue] with activation (
        procedure_name = [dbo].[InitiatorActivation], 
        max_queue_readers = 5, 
        status = on,
        execute as owner
    );
    go
    
    create procedure [dbo].[SendHeartbeat] (
        @ch uniqueidentifier
    )
    as
    begin
        send on conversation (@ch)
            message type [Heartbeat];
    end
    go
    
    use [TimerTarget];
    create master key encryption by password = 'f00bar!23';
    create user [BrokerUser] without login;
    CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
    create message type [Request] validation = none;
    create message type [Response] validation = none;
    create message type [Heartbeat] validation = empty;
    create contract [ConversationContract] (
        [Request] sent by initiator,
        [Response] sent by target,
        [Heartbeat] sent by initiator
    );
    create queue [TargetQueue];
    create service [TargetService] 
        authorization [BrokerUser] 
        on queue [TargetQueue]
        ( [ConversationContract] );
    
    create remote service binding [TimerRSB]
        authorization [BrokerUser]
        to service 'InitiatorService'
        with user = [BrokerUser];
    
    
    declare @broker_instance uniqueidentifier;
    select @broker_instance = service_broker_guid
    from sys.databases
    where name = 'TimerInitiator';
    
    declare @sql nvarchar(max);
    set @sql = concat(
    'create route [TimerInitiatorRoute]
    with service_name = ''InitiatorService'',
        broker_instance = ''', @broker_instance,
        ''', address = ''LOCAL''');
    exec(@sql);
    go
    create table [dbo].[OpenConversations] (
        conversation_handle uniqueidentifier not null,
            constraint [PK_OpenConversations] primary key clustered (conversation_handle),
        heartbeat_ts datetime2(3) not null
    );
    go
    create procedure [dbo].[TargetActivation]
    as
    begin
        set nocount on;
        declare @message_type sysname,
            @ch uniqueidentifier,
            @heartbeat_ts datetime2(3);
        while(1=1)
        begin
            waitfor(
                receive top(1) @message_type = message_type_name,
                    @ch = conversation_handle
                from [dbo].[TargetQueue]
            ), timeout 1000;
    
            if (@@ROWCOUNT = 0)
                break;
            if @message_type = 'Request'
            begin
                insert into [dbo].[OpenConversations] 
                    (conversation_handle, heartbeat_ts)
                values
                    (@ch, SYSUTCDATETIME());
                begin conversation timer (@ch) timeout = 30; -- 30 seconds
            end
            else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer')
            begin
                set @heartbeat_ts = (
                    select [heartbeat_ts] 
                    from [dbo].[OpenConversations] 
                    where [conversation_handle] = @ch
                );
    
                if (datediff(second, @heartbeat_ts, SYSUTCDATETIME()) > 5*60) -- 5 minute time out
                begin
                    end conversation @ch;
                    delete [dbo].[OpenConversations]
                    where [conversation_handle] = @ch;
                end
                else
                begin
                    begin conversation timer (@ch) timeout = 30;
                end
            end
            else if (@message_type = 'Heartbeat')
            begin
                update [dbo].[OpenConversations]
                set [heartbeat_ts] = SYSUTCDATETIME()
                where [conversation_handle] = @ch;
            end
        end
    end
    go
    alter queue [dbo].[TargetQueue] with activation (
        procedure_name = [dbo].[TargetActivation], 
        max_queue_readers = 5, 
        status = on,
        execute as owner
    );
    go
    

    Most of this is service broker plumbing. The "interesting" part is in the activation procedure InitiatorActivation. That's that's where the tracking of timers and whether a given conversation has expired or not is tracked.

    Here's some code to exercise the setup:

    use [TimerInitiator]
    go
    if object_id('tempdb.dbo.#conversations') is not null
        drop table #conversations;
    create table #conversations (conversation_handle uniqueidentifier not null);
    
    declare @ch uniqueidentifier, @i tinyint = 0;
    
    -- start 5 conversations
    while(@i < 5)
    begin
        begin dialog @ch
            from service [InitiatorService]
            to service 'TargetService'
            on contract [ConversationContract];
    
        send on conversation (@ch)
            message type [Request]
            ('<Request />');
    
        insert into #conversations ([conversation_handle])
        values (@ch);
    
        set @i += 1;
    end
    
    go
    
    declare @ch uniqueidentifier = (select top(1) [conversation_handle] from #conversations)
    while (1=1)
    begin
        exec [TimerInitiator].[dbo].[SendHeartbeat] @ch = @ch;
        waitfor delay '0:00:30';
    end
    

    All that's happening here is that five conversations are started and one is arbitrarily chosen to send periodic heartbeats. In another window, you can run one or all of the following to track how things are going:

    select 'T', conversation_handle, message_type_name
    from [TimerTarget].[dbo].[TargetQueue]
    union all 
    select 'I', conversation_handle, message_type_name
    from [TimerInitiator].[dbo].[InitiatorQueue];
    
    select 'T', conversation_handle, state_desc
    from TimerInitiator.sys.conversation_endpoints
    union all
    select 'I', conversation_handle, state_desc
    from TimerTarget.sys.conversation_endpoints;
    
    select *
    from [TimerTarget].[dbo].[OpenConversations];
    

    In your actual setup, it will be your application sending heartbeats. Which it should do regardless of whether it's processing messages or not (i.e. if you hit a period where no messages are sent by the target, your application still needs to send heartbeats). Other than that, this should work with minimal tweaking.