Search code examples
c#azure-webjobsazure-servicebus-queuesazure-queuesazure-storage-queues

Create delay between two message reads of a Queue?


I am using Azure Queues to perform a bulk import. I am using WebJobs to perform the process in the background. The queue dequeues very frequently. How do I create a delay between 2 message reads?

This is how I am adding a message to the Queue

public async Task<bool> Handle(CreateFileUploadCommand message)
{
    var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);

    var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
    {
        TenantId = message.TenantId,
        FileExtension = message.FileExtension,
        FileName = message.Name,
        DeviceId = message.DeviceId,
        SessionId = message.SessionId,
        UserId = message.UserId,
        OutletId = message.OutletId,
        CorrelationId = message.CorrelationId,

    }))
    {
        ContentType = "application/json",
    };

    await queueClient.SendAsync(brokeredMessage);

    return true;
}

And Below is the WebJobs Function.

public class Functions
{
    private readonly IValueProvider _valueProvider;
    public Functions(IValueProvider valueProvider)
    {
        _valueProvider = valueProvider;
    }

    public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
    TextWriter logger)
    {

        var queueMessage = message.GetBody<string>();

        using (var client = new HttpClient())
        {
            client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));

            var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");

            var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);

            if (result.IsSuccessStatusCode)
            {
                await message.CompleteAsync();
            }
            else
            {
                await message.AbandonAsync();
            }
        }
    }
}

Solution

  • As far as I know, azure webjobs sdk enable concurrent processing on a single instance(the default is 16).

    If you run your webjobs, it will read 16 queue messages(peeklock and calls Complete on the message if the function finishes successfully, or calls Abandon) and create 16 processes to execute the trigger function at same time. So you feel the queue dequeues very frequently.

    If you want to disable concurrent processing on a single instance.

    I suggest you could set ServiceBusConfiguration's MessageOptions.MaxConcurrentCalls to 1.

    More details, you could refer to below codes:

    In the program.cs:

    JobHostConfiguration config = new JobHostConfiguration();
    ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
    serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
    config.UseServiceBus(serviceBusConfig);
    
    JobHost host = new JobHost(config);
    host.RunAndBlock();
    

    If you want to create a delay between 2 message reads, I suggest you could create a custom ServiceBusConfiguration.MessagingProvider.

    It contains CompleteProcessingMessageAsync method, this method completes processing of the specified message, after the job function has been invoked.

    I suggest you could add thread.sleep method in CompleteProcessingMessageAsync to achieve delay read.

    More detail, you could refer to below code sample:

    CustomMessagingProvider.cs:

    Notice: I override the CompleteProcessingMessageAsync method codes.

     public class CustomMessagingProvider : MessagingProvider
        {
            private readonly ServiceBusConfiguration _config;
    
            public CustomMessagingProvider(ServiceBusConfiguration config)
                : base(config)
            {
                _config = config;
            }
    
            public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
            {
                // you could return your own NamespaceManager here, which would be used
                // globally
                return base.CreateNamespaceManager(connectionStringName);
            }
    
            public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
            {
                // you could return a customized (or new) MessagingFactory here per entity
                return base.CreateMessagingFactory(entityPath, connectionStringName);
            }
    
            public override MessageProcessor CreateMessageProcessor(string entityPath)
            {
                // demonstrates how to plug in a custom MessageProcessor
                // you could use the global MessageOptions, or use different
                // options per entity
                return new CustomMessageProcessor(_config.MessageOptions);
            }
    
            private class CustomMessageProcessor : MessageProcessor
            {
                public CustomMessageProcessor(OnMessageOptions messageOptions)
                    : base(messageOptions)
                {
                }
    
                public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
                {
                    // intercept messages before the job function is invoked
                    return base.BeginProcessingMessageAsync(message, cancellationToken);
                }
    
                public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
                {
                    if (result.Succeeded)
                    {
                        if (!MessageOptions.AutoComplete)
                        {
                            // AutoComplete is true by default, but if set to false
                            // we need to complete the message
                            cancellationToken.ThrowIfCancellationRequested();
    
    
                            await message.CompleteAsync();
    
                            Console.WriteLine("Begin sleep");
                            //Sleep 5 seconds
                            Thread.Sleep(5000);
                            Console.WriteLine("Sleep 5 seconds");
    
                        }
                    }
                    else
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        await message.AbandonAsync();
                    }
                }
            }
        }
    

    Program.cs main method:

     static void Main()
            {
                var config = new JobHostConfiguration();
    
                if (config.IsDevelopment)
                {
                    config.UseDevelopmentSettings();
                }
    
                var sbConfig = new ServiceBusConfiguration
                {
                    MessageOptions = new OnMessageOptions
                    {
                        AutoComplete = false,
                        MaxConcurrentCalls = 1
                    }
                };
                sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
                config.UseServiceBus(sbConfig);
                var host = new JobHost(config);
    
                // The following code ensures that the WebJob will be running continuously
                host.RunAndBlock();
            }
    

    Result:enter image description here