Search code examples
c#asp.netazureazure-webjobsazure-queues

Azure WebJobs: Queue triggers with different batch sizes


I have a WebJob on azure that processes messages from multiple queues at the same time:

public async static Task ProcessQueueMessage1([QueueTrigger("queue1")] string message)
    {


        switch (message.Substring(message.Length - 3, 3))
        {
            case "tze":
                await Parser.Process1(message);
                break;
            default:
                break;
        }
    }


    public async static Task ProcessQueueMessage2([QueueTrigger("queue2")] string message)
    {


        switch (message.Substring(message.Length - 3, 3))
        {
            case "tzr":
                await Parser.Process2(message);
                break;
            default:
                break;
        }
    }

And the in the MAIN

static void Main()
    {

        JobHostConfiguration config = new JobHostConfiguration();
        config.Queues.BatchSize = 3;
        config.Queues.MaxDequeueCount = 1;
        var host = new JobHost(config);
        host.RunAndBlock();

    }

here: message.Substring(message.Length - 3, 3) just checks the extension of the file.

My question is, how would I go on about making the batch size of queue1 different than queue2, can I make second jobhost with a different configuration and have host.RunAndBlock() and host2.RunAndBlock()?? How would I specify what queue should the jobhost do?

I have also tried the Groupqueuetriggers, but unfortunately they take of string, and in my situation I cannot actually pass lists to the queue. :(


Solution

  • In order to tackle this you need to provide a custom implementation of IQueueProcessorFactory. You only need one JobHost.

    There's an example on how to do this here.

        static void Main()
        {
    
            //Configure JobHost
            var config = new JobHostConfiguration();
            config.Queues.BatchSize = 32;
            config.Queues.MaxDequeueCount = 6;
            // Create a custom configuration
            // If you're using DI you should get this from the kernel
            config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
    
            //Pass configuration to JobJost
            var host = new JobHost(config);
            // The following code ensures that the WebJob will be running continuously
            host.RunAndBlock();
        }
    

    And in the CustomQueueProcessorFactory you can insert the custom configuration based on the queue name.

    public class CustomQueueProcessorFactory : IQueueProcessorFactory
    {
        public QueueProcessor Create(QueueProcessorFactoryContext context)
        {
            if (context == null)
            {
                throw new ArgumentNullException(nameof(context));
            }
            if (context.Queue.Name == "queuename1")
            {
                context.MaxDequeueCount = 10;
            }
            else if (context.Queue.Name == "queuename2")
            {
                context.MaxDequeueCount = 10;
                context.BatchSize = 1;
            }
    
            return new QueueProcessor(context);
        }
    }