Search code examples
c#.netazureazureservicebus

How to properly register and start an Azure Service Bus processor in an Azure App Service


I'm trying to start 4 service bus processors (for 4 different topic subscriptions) in an azure web app. The code is written in .NET 6. When looking at the documentation of Microsoft they state that using a service bus processor is the preferred way to work. As in their own words stated here: Azure Service Bus client library for .NET

The ServiceBusProcessor can be thought of as an abstraction around a set of receivers. It uses a callback model to allow code to be specified when a message is received and when an exception occurs. It offers automatic completion of processed messages, automatic message lock renewal, and concurrent execution of user specified event handlers. Because of its feature set, it should be the go to tool for writing applications that receive from Service Bus entities.

I have however not found documentation of how to register/start these processors. It is described how to register subclients but not the processor. This is an extract from the docs (same link as above):

It is also possible to register sub-clients, such as ServiceBusSender and ServiceBusReceiver with DI using the registered ServiceBusClient instance.

services.AddAzureClients(builder =>
{
    builder.AddServiceBusClient("<< SERVICE BUS CONNECTION STRING >>");

    foreach (var topicName in topicNames)
    {
        builder.AddClient<ServiceBusSender, ServiceBusClientOptions>((_, _, provider) =>
            provider
                .GetService<ServiceBusClient>()
                .CreateSender(topicName)
        )
        .WithName(topicName);
    }
});

You can then use it as follows:

public class ServiceBusSendingController : ControllerBase {
    private readonly ServiceBusSender _sender;

    public ServiceBusSendingController(IAzureClientFactory<ServiceBusSender> serviceBusSenderFactory)
    {
        // Though the method is called "CreateClient", the factory will manage the sender as a
        // singleton, creating a new instance only on the first use.
        _sender = serviceBusSenderFactory.CreateClient("<< TOPIC NAME >>");
    }
}

This is however not described for the Azure Service Bus processors. Here they give the following code example but nothing about registration.

// create a processor that we can use to process the messages
await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

// configure the message and error handler to use
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine(body);

    // we can evaluate application logic and use that to determine how to settle the message.
    await args.CompleteMessageAsync(args.Message);
}

Task ErrorHandler(ProcessErrorEventArgs args)
{
    // the error source tells me at what point in the processing an error occurred
    Console.WriteLine(args.ErrorSource);
    // the fully qualified namespace is available
    Console.WriteLine(args.FullyQualifiedNamespace);
    // as well as the entity path
    Console.WriteLine(args.EntityPath);
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

// start processing
await processor.StartProcessingAsync();

Do they expect us to only register the Azure Service Bus client and then start a background service where we create and start the Azure Service Bus processors? Is this the preferred way to work? Or am I missing something?

If you need extra info, feel free ta ask.


Solution

  • Do they expect us to only register the Azure Service Bus client and then start a background service where we create and start the Azure Service Bus processors? Is this the preferred way to work? Or am I missing something?

    Yes, You need to configure them like below and I followed Microsoft-Document:

    Program.cs:

    using Azure.Messaging.ServiceBus;
    
        var rith = WebApplication.CreateBuilder(args);
        rith.Services.AddRazorPages();
        rith.Services.AddSingleton<ServiceBusClient>(provider =>
        {
            var ri_cs = rith.Configuration["rithcon"];
            return new ServiceBusClient(ri_cs);
        });
        rith.Services.AddHostedService<RithwikTestBusProcess>();
        ------------
        ------------
        ------------
        app.Run();
    

    RithwikTestBusProcess.cs:

    using Azure.Messaging.ServiceBus;
    public class RithwikTestBusProcess : BackgroundService
    {
        private readonly ILogger<RithwikTestBusProcess> ri_lg;
        private readonly List<ServiceBusProcessor> ri_pr = new List<ServiceBusProcessor>();
        private readonly ServiceBusClient ri_scl;
        
        public RithwikTestBusProcess(ServiceBusClient sb, ILogger<RithwikTestBusProcess> rilg)
        {
            ri_scl = sb;
            ri_lg = rilg;
        }
        private readonly Dictionary<string, string> Rith_Topics_Subs = new Dictionary<string, string>
        {
            { "rithtopic1", "rithsub1" },
            { "ritopic2", "rithsub2" }
        };
        protected override async Task ExecuteAsync(CancellationToken stop_tok)
        {
            foreach (var rith in Rith_Topics_Subs)
            {
                var ri_topic_name = rith.Key;
                var ri_subsptn_name = rith.Value;
                var ri_proc = ri_scl.CreateProcessor(ri_topic_name, ri_subsptn_name, new ServiceBusProcessorOptions());
                ri_proc.ProcessMessageAsync += ProcessMessageHandler;
                ri_proc.ProcessErrorAsync += ProcessErrorHandler;
                ri_pr.Add(ri_proc);
                await ri_proc.StartProcessingAsync(stop_tok);
            }
        }
        private Task ProcessMessageHandler(ProcessMessageEventArgs ch)
        {
            ri_lg.LogInformation($"Hello Rithwik Bojja, the message recieved to servivce bus topic is : {ch.Message.Body}");
            return ch.CompleteMessageAsync(ch.Message);
        }
    
        private Task ProcessErrorHandler(ProcessErrorEventArgs ch)
        {
            ri_lg.LogError($"Hello Rithwik Bojja, the exception is: {ch.Exception.Message}");
            return Task.CompletedTask;
        }
    
        public override async Task StopAsync(CancellationToken stop_tok)
        {
            foreach (var ch in ri_pr)
            {
                await ch.StopProcessingAsync(stop_tok);
            }
            await base.StopAsync(stop_tok);
        }
    }
    
    

    appsettings.json:

    {
      "Logging": {
        ------
        ------
      },
      "AllowedHosts": "*",
        "rithcon": "Endpoint=sb://rithtest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=sTQkT5zdQ="
      }
    

    Output:

    enter image description here