Search code examples
c#asp.net-coredependency-injectionazure-queues

How to create dependency injection for Azure Storage queues to process each message as it is provided


Trying to read from an Azure Queue to which some other service writes. If I use this in startup.cs

CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=*;AccountKey=*;EndpointSuffix=*");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("*");
queue.CreateIfNotExists();
var message= queue.GetMessage();

I can get the message in 'message' variable, but how to inject this in startup so that my processor class gets called with the message each time there is a new message in the queue. I tried to add a singleton by,

services.AddSinleton<ProcessorClassInterface>(x=> {return new ProcessorClass(queue)});

And then calling queue.GetMessage after every 1 second there.


Solution

  • This was solved by calling a function which uses multi-threading to poll the azure queues after the specified interval of time and fetch the messaged (with probably a set exponential back off time).

    Approach 1: To implement this in a webapp is a bit trickier, and I had to use a hack - call a function from the constructor to get the polling started.

    In startup.cs (inside the configure function), register your service,

    app.ApplicationServices.GetService<IQueueConsumer>();
    

    In ConfigureServices Function, Configuring and creating an object of the polling queue class,

    services.TryAddTransient<IQueueConsumer>(sp => this.GetQueueProcessor(sp));
    

    And then, when the constructor is called to create the object, start polling a queue in a different thread.

    public QueuePollingFunction(
            IOptions<QueueOptions> queueOptions,
            CloudQueue queue)
        {
            this.isEnabled = queueOptions.Value.IsEnabled;
            this.StartPollingQueue(queue);
        }
    
           public override async Task<bool> ProcessMessageAsync(string message)
        {
            bool result = false;
            try
            {
                var messageContent = JsonConvert.DeserializeObject<QueueEntity>(message);
                result = true;
            }
            catch (Exception e)
            {
                Trace.TraceError(e.ToString());
            }
    
            return result;
        }
    
        private async Task StartPollingQueue(CloudQueue queue)
        {
            if (this.isEnabled)
            {
                Task pollQueue = Task.Factory.StartNew(() => Parallel.For(0, this.numberOfParallelTasks, work =>
                {
                    this.Start(queue);
                }));
            }
        }
    
        private async Task Start(CloudQueue queue)
        {
            while (true)
            {
                try
                {
                    CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
                    if (retrievedMessage != null)
                    {
                        // Fail Logic
                        if (retrievedMessage.DequeueCount > this.maxRetryLimit)
                        {
                            await queue.DeleteMessageAsync(retrievedMessage);
                        }
                        bool isPass = await this.ProcessMessageAsync(newChannelSettings);
                        if (isPass)
                        {
                            await queue.DeleteMessageAsync(retrievedMessage);
                        }
                    }
                    else
                    {
                        // If queue is empty, then the Task can sleep for sleepTime duration
                        await Task.Delay(this.sleepTime);
                    }
                }
                catch (Exception e)
                {
                    Trace.TraceError(e.ToString());
                }
            }
        }
    

    Approach 2: However, later had to move to the optimal approach, which is to use worker-roles and then uses Tasks to run a background thread to perform this task.