After having looked into a message session based request-reply pattern, I decided to implement a request-reply model that does NOT use message sessions, but solely relies on the ReplyTo
and CorrelationId
message properties as described here:
Simple request/reply: A publisher sends a message into a queue and expects a reply from the message consumer. To receive the reply, the publisher owns a queue into which it expects replies to be delivered. The address of that queue is expressed in the ReplyTo property of the outbound message. When the consumer responds, it copies the MessageId of the handled message into the CorrelationId property of the reply message and delivers the message to the destination indicated by the ReplyTo property.
Now, the problem is that when service A is receiving messages from multiple services B, C, D,... etc., it cannot know beforehand which queue name will be in the ReplyTo
field. As a consequence, A must be able to create ServiceBusSender
instances dynamically for being able to write reply messages to various queues.
This kind of goes against the suggestion to create long-lasting ServiceBusSender
instances:
The Service Bus objects that interact with the service, such as ServiceBusClient, ServiceBusSender, ServiceBusReceiver, and ServiceBusProcessor, should be registered for dependency injection as singletons (or instantiated once and shared). [...] We recommend that you don't close or dispose these objects after sending or receiving each message.
So what's the best way do deal with the above conflictive requirements of dynamic vs. static ServiceBusSender
instance creation when also considering dependency injection?
For creating ServiceBusSender
instances on demand whose lifecycle will still be handled by the dependency injection container, one can use a threadsafe wrapper singleton that keeps track of all created senders.
Something like this:
public class UniversalServiceBusSender : IAsyncDisposable
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senderRegistry = new ();
public UniversalServiceBusSender(ServiceBusClient client)
{
_client = client;
}
public Task SendMessageAsync(ServiceBusMessage message, string queueOrTopicName, CancellationToken cancellationToken = default)
{
var sender = _senderRegistry.GetOrAdd(queueOrTopicName, _client.CreateSender);
return sender.SendMessageAsync(message, cancellationToken);
}
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
protected virtual async ValueTask DisposeAsyncCore()
{
if (!_senderRegistry.IsEmpty)
{
var disposeTasks = _senderRegistry.Values.Select(sender => sender.DisposeAsync());
foreach (var disposeTask in disposeTasks)
{
await disposeTask.ConfigureAwait(false);
}
_senderRegistry.Clear();
}
}
}
During startup, the service registration would look like this:
services.AddAzureClients(builder =>
{
builder.AddServiceBusClient(options.AzureServiceBus.ConnectionString);
});
services.AddSingleton<UniversalServiceBusSender>();
The UniversalServiceBusSender
singleton can then be constructor-injected wherever needed.