---Edited to include more and better details---
In SQL Server, you can initialize a conversation on a queue:
https://msdn.microsoft.com/en-us/library/ms187377.aspx
Multiple processes will all initialize conversations, store the @dialog_handle in table [dbo].[ActiveConversations], and then WAITFOR a message to arrive (from a trigger on a table).
Here's the gist of what the code is doing (leaving out some irrelevant complications and error checking and such):
Starting the conversation via .NET SqlCommand:
DECLARE @LoginTime datetime;
DECLARE @handle uniqueidentifier;
SELECT TOP 1 @LoginTime = login_time FROM sys.sysprocesses WHERE spid = @@SPID;
BEGIN DIALOG @handle
FROM SERVICE [myService]
TO SERVICE 'myService'
ON CONTRACT myContract
WITH ENCRYPTION=OFF;
INSERT INTO [dbo].[ActiveConversations]
(
ConversationHandle,
SysProcessID,
SysProcessLoginTime
)
VALUES
(
@handle,
@@SPID,
@LoginTime
)
Ending the conversation via .NET SqlCommand:
DECLARE @handle uniqueidentifier;
SELECT TOP 1 @handle = ConversationHandle
FROM [dbo].[ActiveConversations] AS conv
INNER JOIN sys.sysprocesses as sysp
ON conv.SysProcessLoginTime = sysp.login_time;
DELETE FROM [dbo].[ActiveConversations] WHERE ConversationHandle = @handle;
END CONVERSATION @handle;
Sending messages:
CREATE TRIGGER [dbo].[myTableChanged] ON [dbo].[myTable] AFTER UPDATE
AS
BEGIN
DECLARE @handle uniqueidentifier;
DECLARE curs CURSOR LOCAL STATIC READ_ONLY FORWARD_ONLY
FOR
SELECT ConversationHandle FROM [dbo].[ActiveConversations];
OPEN curs
FETCH NEXT FROM curs INTO @handle;
WHILE @@FETCH_STATUS = 0
BEGIN
BEGIN TRY
SEND ON CONVERSATION @handle
MESSAGE TYPE [myType] ( '' );
END TRY
BEGIN CATCH
END CATCH
FETCH NEXT FROM curs INTO @ConversationHandle;
END
CLOSE curs;
DEALLOCATE curs;
END
Waiting for messages:
DECLARE @handle uniqueidentifier;
SELECT @handle = [ConversationHandle]
FROM [dbo].ActiveConversations] AS conv
INNER JOIN sys.sysprocesses AS sysp
ON sysp.spid = conv.SysProcessID
AND sysp.login_time = conv.SysProcessLoginTime
AND sysp.spid = @@SPID;
WAITFOR (RECEIVE * FROM [dbo].[myQueue] WHERE conversation_handle = @handle);
When each process exits, it is responsible for calling END CONVERSATION. However, if a process were to die an untimely death, it will never get a chance to call END CONVERSATION, and the conversation will sit out there forever. Or will it?
Will the conversation be cleaned up automatically, or how do you make sure dead conversations don't accumulate?
What's the best practice in this scenario? Is it to set a specific timeout and then re-initialize the conversation periodically? If the conversation ends unexpectedly, is there a way to find out so it can be deleted from the [dbo].[ActiveConversations] table?
There is no out-of-the-box mechanism to say "has the far side service received messages within some time span?". That said, we can use conversation timers to accomplish it. The gist of it is that the target service, once it gets signal that a new conversation is starting, begins a conversation timer and records that conversation's handle in a table (along with a time stamp). When that timer expires, the activation procedure checks the table to see when the last heartbeat message from the initiator service was processed. If it's outside of a predetermined tolerance, the conversation is ended. In parallel, the initiator service sends heartbeat messages on a cadence letting the target service know that it's still alive.
Here's the setup code:
use master;
go
if exists (select 1 from sys.databases where name = 'TimerInitiator')
begin
alter database [TimerInitiator] set offline with rollback immediate;
alter database [TimerInitiator] set online;
drop database [TimerInitiator];
end
if exists (select 1 from sys.databases where name = 'TimerTarget')
begin
alter database [TimerTarget] set offline with rollback immediate;
alter database [TimerTarget] set online;
drop database [TimerTarget];
end
create database [TimerInitiator];
create database [TimerTarget];
revert;
go
use [TimerInitiator];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0xx1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create message type [Heartbeat] validation = none;
create contract [ConversationContract] (
[Request] sent by initiator,
[Response] sent by target,
[Heartbeat] sent by initiator
);
create queue [InitiatorQueue] ;
create service [InitiatorService]
authorization [BrokerUser]
on queue [InitiatorQueue]
( [ConversationContract] );
create remote service binding [TimerRSB]
authorization [BrokerUser]
to service 'TargetService'
with user = [BrokerUser];
declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerTarget';
declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerTargetRoute]
with service_name = ''TargetService'',
broker_instance = ''', @broker_instance,
''', address = ''LOCAL''');
exec(@sql)
go
go
create procedure [InitiatorActivation]
as
begin
declare @ch uniqueidentifier, @message_type sysname;
while(1=1)
begin
waitfor (
receive top(1) @ch = conversation_handle,
@message_type = message_type_name
from [dbo].[InitiatorQueue]
), timeout 1000;
if (@@ROWCOUNT = 0)
break;
--if (@message_type = 'Response')
--begin
-- -- do something to process the message
--end
--else
if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @ch
end
end
end
go
alter queue [dbo].[InitiatorQueue] with activation (
procedure_name = [dbo].[InitiatorActivation],
max_queue_readers = 5,
status = on,
execute as owner
);
go
create procedure [dbo].[SendHeartbeat] (
@ch uniqueidentifier
)
as
begin
send on conversation (@ch)
message type [Heartbeat];
end
go
use [TimerTarget];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0xx1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create message type [Heartbeat] validation = empty;
create contract [ConversationContract] (
[Request] sent by initiator,
[Response] sent by target,
[Heartbeat] sent by initiator
);
create queue [TargetQueue];
create service [TargetService]
authorization [BrokerUser]
on queue [TargetQueue]
( [ConversationContract] );
create remote service binding [TimerRSB]
authorization [BrokerUser]
to service 'InitiatorService'
with user = [BrokerUser];
declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerInitiator';
declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerInitiatorRoute]
with service_name = ''InitiatorService'',
broker_instance = ''', @broker_instance,
''', address = ''LOCAL''');
exec(@sql);
go
create table [dbo].[OpenConversations] (
conversation_handle uniqueidentifier not null,
constraint [PK_OpenConversations] primary key clustered (conversation_handle),
heartbeat_ts datetime2(3) not null
);
go
create procedure [dbo].[TargetActivation]
as
begin
set nocount on;
declare @message_type sysname,
@ch uniqueidentifier,
@heartbeat_ts datetime2(3);
while(1=1)
begin
waitfor(
receive top(1) @message_type = message_type_name,
@ch = conversation_handle
from [dbo].[TargetQueue]
), timeout 1000;
if (@@ROWCOUNT = 0)
break;
if @message_type = 'Request'
begin
insert into [dbo].[OpenConversations]
(conversation_handle, heartbeat_ts)
values
(@ch, SYSUTCDATETIME());
begin conversation timer (@ch) timeout = 30; -- 30 seconds
end
else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer')
begin
set @heartbeat_ts = (
select [heartbeat_ts]
from [dbo].[OpenConversations]
where [conversation_handle] = @ch
);
if (datediff(second, @heartbeat_ts, SYSUTCDATETIME()) > 5*60) -- 5 minute time out
begin
end conversation @ch;
delete [dbo].[OpenConversations]
where [conversation_handle] = @ch;
end
else
begin
begin conversation timer (@ch) timeout = 30;
end
end
else if (@message_type = 'Heartbeat')
begin
update [dbo].[OpenConversations]
set [heartbeat_ts] = SYSUTCDATETIME()
where [conversation_handle] = @ch;
end
end
end
go
alter queue [dbo].[TargetQueue] with activation (
procedure_name = [dbo].[TargetActivation],
max_queue_readers = 5,
status = on,
execute as owner
);
go
Most of this is service broker plumbing. The "interesting" part is in the activation procedure InitiatorActivation
. That's that's where the tracking of timers and whether a given conversation has expired or not is tracked.
Here's some code to exercise the setup:
use [TimerInitiator]
go
if object_id('tempdb.dbo.#conversations') is not null
drop table #conversations;
create table #conversations (conversation_handle uniqueidentifier not null);
declare @ch uniqueidentifier, @i tinyint = 0;
-- start 5 conversations
while(@i < 5)
begin
begin dialog @ch
from service [InitiatorService]
to service 'TargetService'
on contract [ConversationContract];
send on conversation (@ch)
message type [Request]
('<Request />');
insert into #conversations ([conversation_handle])
values (@ch);
set @i += 1;
end
go
declare @ch uniqueidentifier = (select top(1) [conversation_handle] from #conversations)
while (1=1)
begin
exec [TimerInitiator].[dbo].[SendHeartbeat] @ch = @ch;
waitfor delay '0:00:30';
end
All that's happening here is that five conversations are started and one is arbitrarily chosen to send periodic heartbeats. In another window, you can run one or all of the following to track how things are going:
select 'T', conversation_handle, message_type_name
from [TimerTarget].[dbo].[TargetQueue]
union all
select 'I', conversation_handle, message_type_name
from [TimerInitiator].[dbo].[InitiatorQueue];
select 'T', conversation_handle, state_desc
from TimerInitiator.sys.conversation_endpoints
union all
select 'I', conversation_handle, state_desc
from TimerTarget.sys.conversation_endpoints;
select *
from [TimerTarget].[dbo].[OpenConversations];
In your actual setup, it will be your application sending heartbeats. Which it should do regardless of whether it's processing messages or not (i.e. if you hit a period where no messages are sent by the target, your application still needs to send heartbeats). Other than that, this should work with minimal tweaking.