Search code examples
azure-functionsazureservicebus.net-6.0azure-servicebus-topicsdotnet-isolated

Azure function worker isolated process .NET 6 - work with servicebus and servicebus trigger


I'm working with Azure Function isolated process .net 6.

I have an issue to work with the service bus and service bus trigger.

If in-process I will declare like this

public async Task<ActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequest req, ILogger log, ExecutionContext context,
    [ServiceBus("mail-sender", Connection = "ServiceBusConnection")] IAsyncCollector<dynamic> outgoingMessage)
{
    ... outgoingMessage.Send(mymessage);...
}

And then I will have another service bus trigger azure function to process the message like this

   public void Run([ServiceBusTrigger("mail-sender", Connection = "ServiceBusConnection")]string myQueueItem, ILogger log)
        {
            try
            {
                var mailHost = Environment.GetEnvironmentVariable("MAIL_HOST") ?? "smtp.sendgrid.net";
                var mailPort = Convert.ToInt32(Environment.GetEnvironmentVariable("MAIL_PORT") ?? "587");
                var mailUsername = Environment.GetEnvironmentVariable("MAIL_USERNAME") ?? "apikey";
                var mailPassword = Environment.GetEnvironmentVariable("MAIL_PASSWORD") ?? "8755faf7-78c9-4389-b3a5-f1578953bc00";
                var ssl = Convert.ToBoolean(Environment.GetEnvironmentVariable("MAIL_SSL") ?? "false");

                using (var mailHelpers = new MailHelpers(mailHost, mailPort, mailUsername, mailPassword, ssl))
                {
                    var mail = JsonConvert.DeserializeObject<MailViewModel>(myQueueItem);
                    mailHelpers.Send(mail);
                }
            }
            catch (Exception ex)
            {
                log.LogError(ex, "Error during sending email.");
            }
        }

How can I achieve that in the azure function isolated process?

Please help me to give example detail and the package's dependencies if any.

Many thanks

===========================

BTW, I have declare TimerTrigger azure function, it uses https://www.nuget.org/packages/Microsoft.Azure.Functions.Worker.Extensions.Timer It can trigger the task to run, but I cannot debug it? I'm not sure why?

        public async Task<DispatchedMessages> Run([TimerTrigger("* * * * * *")] MyInfo myTimer)
        {
            try
            {...}
        }

Solution

  • As I understand your question, the flow will look like something like this:

    1. post to Httptrigger
    2. the HttpTrigger will queue a message in some service bus queue (for detail about how to post a message to the queue see the documentation) Here is how you can send a message to a service bus queue:
    using Azure.Messaging.ServiceBus;
    
    public class ServiceBusAdapter
        {
            private readonly ServiceBusClient _client;
    
            public ServiceBusAdapter(ServiceBusClient client)
            {
                _client = client;
            }
    
            public async Task SendMessage(string queueName, BinaryData body, string? messageId = null)
            {
                ServiceBusMessage sbMessage = CreateServiceBusMessage(body, messageId);
                await using ServiceBusSender sender = _client.CreateSender(queueName);
    
                try
                {
                    await sender.SendMessageAsync(CreateServiceBusMessage(body, messageId));
                }
                catch (ServiceBusException e) when (e.IsTransient)
                {
                    throw new SomeCustomException(e.Message, e);
                }
            }
    
            public async Task SendMessages(string queueName, IEnumerable<Message> messages)
            {
                Queue<ServiceBusMessage> sbMessages = new(messages.Select(m => CreateServiceBusMessage(m.Body, m.MessageId)));
                await using ServiceBusSender sender = _client.CreateSender(queueName);
    
                // While all messages are not sent to the Service Bus queue
                while (sbMessages.Count > 0)
                {
                    // Start a new batch
                    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
                    // Add the first message to the batch
                    if (messageBatch.TryAddMessage(sbMessages.Peek()))
                    {
                        // Dequeue the message from the .NET queue once the message is added to the batch
                        sbMessages.Dequeue();
                    }
                    else
                    {
                        // If the first message can't fit, then it is too large for the batch.
                        // Try to send it anyway so that we get a proper service bus exception.
                        await sender.SendMessageAsync(sbMessages.Dequeue());
                        continue;
                    }
    
                    // Add as many messages as possible to the current batch
                    while (sbMessages.Count > 0 && messageBatch.TryAddMessage(sbMessages.Peek()))
                    {
                        // Dequeue the message from the .NET queue as it has been added to the batch
                        sbMessages.Dequeue();
                    }
    
                    try
                    {
                        // Now, send the batch
                        await sender.SendMessagesAsync(messageBatch);
                    }
                    catch (ServiceBusException e) when (e.IsTransient)
                    {
                        throw new SomeCustomException(e.Message, e);
                    }
    
                    // If there are any remaining messages in the .NET queue, the while loop repeats
                }
            }
    
            private static ServiceBusMessage CreateServiceBusMessage(BinaryData body, string? messageId)
            {
                var message = new ServiceBusMessage(body);
    
                if (messageId is not null)
                {
                    // Service Bus (Standard and Premium SKU but not Basic) finds
                    // duplicates by this MessageId
                    message.MessageId = messageId;
                }
    
                return message;
            }
        }
    
    1. once the message is queued the service bus will notify the ServiceBusTrigger that is configured and listening to that queue.

    Here is an example of a ServiceBusTrigger in .net 6 isolated functions:

    using Microsoft.Azure.Functions.Worker;
    using Microsoft.Extensions.Logging;
    
    namespace Functions.Sample
    {
        public class SampleServiceBusTrigger
        {
            private readonly ILogger _logger;
            private const string QueueName = "My.Sample.Queue.Name";
    
            private readonly ISomeHandler _someHandler;
    
            public SampleServiceBusTrigger(
                ILoggerFactory loggerFactory,
                ISomeHandler someHandler)
            {
                _logger = loggerFactory.CreateLogger<SampleServiceBusTrigger>();
                _someHandler = someHandler;
            }
    
            [Function(nameof(SampleServiceBusTrigger))]
            public async Task RunAsync(
                [ServiceBusTrigger(QueueName)] MyObject outMessage)
            {
                _logger.LogInformation("Triggered...");
                await _someHandler.Execute(outMessage);
                _logger.LogInformation("Completed.");
            }
        }
    }
    

    You will also need to add these configs to your local.settings.json for local development:

    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "AzureWebJobsServiceBus": "<YOUR SB CONNECTION STRING>",