Search code examples
c#azureazureservicebusservicebusazure-servicebus-subscriptions

Want to have Azure.Messaging.ServiceBus ServiceBusReceiver in the parameters of the trigger


I have a Function App (FA) with a Service Bus (SB) trigger. Previously we used the Microsoft.Azure.ServiceBus library. and we had a MessageReceiver instance between the parameters in the function trigger.

Now, I want to migrate the funcion from Microsoft.Azure.ServiceBus to Azure.Messaging.ServiceBus. Thus, I have tried to use the corresponding receiver in this new library ServiceBusReceiver.

However, when I use this new receiver between the parameters in the trigger (as you can see below) throws an error:

[FunctionName("Test")]        
public async Task Run([ServiceBusTrigger("%topic%", "%subscription%", Connection = "ListenKey")]
    ServiceBusReceivedMessage sbMsg,
    ServiceBusReceiver receiver,
    ILogger log)
{
    // Do Something ..
}

the error thrown is:

The 'Test' function is in error: Microsoft.Azure.WebJobs.Host: Error indexing method 'Test'. Microsoft.Azure.WebJobs.Host: Cannot bind parameter 'receiver' to type ServiceBusReceiver. Make sure the parameter Type is supported by the binding. If you're using binding extensions (e.g. Azure Storage, ServiceBus, Timers, etc.) make sure you've called the registration method for the extension(s) in your startup code (e.g. builder.AddAzureStorage(), builder.AddServiceBus(), builder.AddTimers(), etc.).

The Test function is little complex and I don't want the function itself to decide when to put things in the DeadLetterQueue. Thus, I have set autoComplete to false in the host.json file.

I.e. the host file looks similar to this:

{
  "version": "2.0",
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": { "autoComplete": false }
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": false
      }
    }
  }
}

I did some research and I couldn't find any example for the SB trigger that have the new receiver between the parameters.

One of the solution, that I tried but is not working and I am convinced that it should not work (but I thought to test it any way): is to try to create the receiver inside the function using a ServiceBusClient. But that doesn't work, as expected, and it throws the following error when the receiver mark this message as complete (i.e. when the function calls await receiver.CompleteMessageAsync(sbMessage); ) to remove the message from the bus:

Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance. (MessageLockLost)

Which is, of course, expected as I said since the function itself should lock the messages inside the topic.

So, Any other idea to how to solve this problem?


If that can help, my Test function (after migrating to new SB library) looks similar to this:

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using myNameSpace.Models;
using Microsoft.Extensions.Options;
using Azure.Messaging.ServiceBus;


namespace myNameSpace
{

    public class myClass
    {
        private readonly AppConfig.InternalInformation _info;
        private readonly AppConfig.EventInformation _info2;

        private readonly IConfiguration _configuration;
        

        public myClass(IOptions<AppConfig.InternalInformation> info, IOptions<AppConfig.EventInformation> info2,
            IConfiguration configuration)
        {
            this._configuration = configuration;
            this._info = info.Value;
            this._info2 = info2.Value;
        }

        
        // System A send message to the Service Bus (SB) Topic called TopicX
        // then the below function in function App (is triggered): 
        [FunctionName("Test")]
        public async Task Run(
            [ServiceBusTrigger("%InternalInformation:Topic%", "%InternalInformation:Subscription%",
                Connection = "InternalInformation:ListenKey")]
            ServiceBusReceivedMessage sbMsg,
            ServiceBusReceiver receiver,
            ExecutionContext context,
            ILogger log)
        {
            // 1. Do Something ...
            //          if the below method (DoSomething()) failed  the function will throw an exception.
            //          and SB will retry to trigger this function 10 times
            //          before moving this message to deadletter
            string someMsg = DoSomething(sbMsg);
            
            
            
            // However if everyThing is Ok in the previous method then:
            // 2. Send the message to another SB
            ServiceBusMessage newMsg = new ServiceBusMessage(Encoding.UTF8.GetBytes(someMsg));
            
            ServiceBusClient sendClient = new ServiceBusClient(_info2.SendKey);
            ServiceBusSender sender = sendClient.CreateSender(_info2.Topic);

            await sender.SendMessageAsync(newMsg); //<-- very important: send this new msg to the new topic ONLY ONCE!!!

            await sender.CloseAsync(); // todo test this
            await sendClient.DisposeAsync();
            
            
            
            // 3. try to do another thing
            try
            {
                // Do Something 2...
                //          if the below method (DoSomething2()) failed the function will throw an exception.
                //          In this case, we don't want SB to trigger this function again because
                //          we don't want to do step 2 again and send the same msg to the other topic again (_info2.Topic). 
                DoSomething2(sbMsg);

                // However if it worked, then: 
                // 4a. mark the message as complete and remove it from the bus. 
                // ServiceBusClient recClient = new ServiceBusClient(_info.ListenKey);
                // ServiceBusReceiver rec = recClient.CreateReceiver(_info.Topic, _info.Subscription);
                
                await receiver.CompleteMessageAsync(sbMsg);
                
                await receiver.CloseAsync();
                // await recClient.DisposeAsync();
            }
            catch (Exception e)
            {
                // else if DoSomething2() failed, then: 
                // 4b. move the msg to the DeadLetter Queue. 
                // ServiceBusClient recClient = new ServiceBusClient(_info.ListenKey);
                // ServiceBusReceiver rec = recClient.CreateReceiver(_info.Topic, _info.Subscription);
                
                await receiver.DeadLetterMessageAsync(sbMsg);
                
                await receiver.CloseAsync();
                // await recClient.DisposeAsync();
                
                throw e;
            }

        }
        
        private string DoSomething(ServiceBusReceivedMessage sbMsg)
        {
            // ...
            return "Something";
        }
        
        private void DoSomething2(ServiceBusReceivedMessage sbMsg)
        {
            // ...
        }
    }
}

Solution

  • Creating a separate receiver to complete a message triggered by the function won't work as the manually created receiver will have a different connection not associated with the incoming message.

    With Azure.Messaging.ServiceBus and In-Process Functions SDK the way to invoke message disposition manually is by adding a ServiceBusMessageActions parameter to your function and using it with the message.