Search code examples
azureazureservicebusazure-functions-isolated

How to use SessionId with ServiceBusOutput on Worker Function azure


I am using .NET Isolated Functions

I need to send multiple messages to a queue

So I am using the code below

[Function("process-duplicates")]
public async Task<DispatchedMessages> ProcessDataAsync([Microsoft.Azure.Functions.Worker.HttpTrigger(AuthorizationLevel.Anonymous,
        "post", Route = "myroute")]
    HttpRequest req,
    IAsyncCollector<string> asyncCollector)
{
    await _myService.ProcessDataAsync();

    return new DispatchedMessages
    {
        Messages = _myService.MessagesToBeSent.Select(x => x.ToJson()),
        Status = new OkObjectResult("Processed Data")
    };
}

Where DispatchedMessages is

public class DispatchedMessages
{
    [JsonIgnore]
    [ServiceBusOutput("QueueName", Connection = "event-bus-connection")]
    public IEnumerable<string> Messages { get; set; }

    public IActionResult Status { get; set; }

}

This works fine for a normal queue

However, I now need to get this work on a session based queue so I need to provide a value for the sessionId

  var messages = new List<ServiceBusMessage>();

  _myService.MessagesToBeSent.ForEach(x =>
  {
      var message = new ServiceBusMessage(x.ToJson())
      {
        SessionId = "Test"
      };

      messages.Add(message);
    });

    var result = new DispatchedMessages
    {
      Messages = messages.Select(x => x.ToJson()),
      Status = new OkObjectResult("Processed Data")
    };

But this doesnt work

How can I do this?

Paul


Solution

  • How to use SessionId with ServiceBusOutput on Worker Function azure

    To send messages to service bus with Session Id , you have to use Azure SDK's for it .

    Function.cs:

        using Azure.Messaging.ServiceBus;
        using Microsoft.AspNetCore.Http;
        using Microsoft.AspNetCore.Mvc;
        using Microsoft.Azure.Functions.Worker;
        using Microsoft.Extensions.Logging;
        
        namespace RithFunctionApp
        {
            public class Function1
            {
                private readonly ILogger<Function1> ri_lg;
                private readonly ServiceBusClient ri_sbc;
        
                public Function1(ILogger<Function1> ri, ServiceBusClient sbc)
                {
                    ri_lg = ri;
                    ri_sbc = sbc;
                }
        
                [Function("Function1")]
                public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequest req)
                {
                    ri_lg.LogInformation("Hello Rithwik Bojja, Function is sending messages");
                    var msg_with_ssId = MessageAndSession();
                    await SendMessageToServiceBus(msg_with_ssId);
                    return new OkObjectResult("Hello Rithwik Bojja, Sent Message with id, please check");
                }
        
                private List<ListTest> MessageAndSession()
                {
                    return new List<ListTest>
                    {
                        new ListTest { Msg = "Rithwik", SessId = "Sess-1" },
                        new ListTest { Msg = "Bojja", SessId = "Sess-2" },
                    };
                }
        
                private async Task SendMessageToServiceBus(List<ListTest> msgs)
                {
                    ServiceBusSender sender = ri_sbc.CreateSender("rithq1");
                    foreach (var rith in msgs)
                    {
                        var ri_msg = new ServiceBusMessage(rith.Msg)
                        {
                            SessionId = rith.SessId
                        };
                        await sender.SendMessageAsync(ri_msg);
                    }
                }
            }
        
            public class ListTest
            {
                public string Msg { get; set; }
                public string SessId { get; set; }
            }
        }
    

    Here rithq1 is queue name.

    Program.cs:

        using Azure.Messaging.ServiceBus;
        using Microsoft.Extensions.Hosting;
        using Microsoft.Extensions.DependencyInjection;
        
        var ri = new HostBuilder()
            .ConfigureFunctionsWebApplication()
            .ConfigureServices(rith =>
            {
                rith.AddSingleton((s) =>
                {
                    string rics = Environment.GetEnvironmentVariable("test");
                    return new ServiceBusClient(rics);
                });
            })
            .Build();
        ri.Run();
    

    Here test is connection string stored in local.settings.json.

    local.settings.json:

        {
            "IsEncrypted": false,
          "Values": {
            "AzureWebJobsStorage": "UseDevelopmentStorage=true",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
            "test": "Endpoint=sb://rithtest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=6YXOZPAvhY="
          }
        }
    

    Output:

    enter image description here

    enter image description here

    enter image description here