Search code examples
godistributed-computingtask-queue

What is the best way to route tasks in machinery(go)?


I'm trying to use machinery as a distributed task queue and would like to deploy separate workers for different groups of tasks. E.g. have a worker next to the database server running database related tasks and a number of workers on different servers running cpu/memory intensive tasks. Only the documentation isn't really clear on how one wold do this.

I initially tried running the workers without registering unwanted tasks on to them but this resulted in the worker repeatedly consuming the unregistered task and requeuing it with he following message:

INFO: 2022/01/27 08:33:13 redis.go:342 Task not registered with this worker. Requeuing message: {"UUID":"task_7026263a-d085-4492-8fa8-e4b83b2c8d59","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"int32","Value":2},{"Name":"","Type":"int32","Value":4}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}

I suspect this can be fixed by setting IgnoreWhenTaskNotRegistered to True however this doesn't seem like a very elegant solution.

Task signatures also have a RoutingKey field but there was no info in the docs on how to configure a worker to only consume tasks from a specific routing key.

Also, one other solution would be to have separate machinery task servers but this would take away the ability to use workflows and orchestrate tasks between workers.


Solution

  • Found the solution through some trial and error.

    Setting IgnoreWhenTaskNotRegistered to true isn't a correct solution since, unlike what I initially thought, the worker still consumes the unregistered task and then discards it instead of requeuing it.

    The correct way to route tasks is to set RoutingKey in the task's signature to the desired queue's name and use taskserver.NewCustomQueueWorker to get a queue specific worker object instead of taskserver.NewWorker

    Sending a task to a specific queue:

    task := tasks.Signature{
        Name: "<TASKNAME>",
        RoutingKey: "<QUEUE>",
        Args: []tasks.Arg{
            // args...
        },
    }
    
    res, err := taskserver.SendTask(&task)
    if err != nil {
        // handle error
    }
    

    And starting a worker to consume from a specific queue:

    worker := taskserver.NewCustomQueueWorker("<WORKERNAME>", concurrency, "<QUEUE>")
    if err := worker.Launch(); err != nil {
        // handle error
    }
    

    Still not quite sure how to tell a worker to consume from a set of queues as NewCustomQueueWorker only accepts a single string as it's queue name, however that's a relatively minor detail.