Search code examples
design-patternsmessage-queuedistributeddispatcherjob-queue

Job queue with job affinity


I am currently facing a problem for which I am pretty sure there is an official name, but I don't know what to search the web for. I hope that if I describe the problem and the solution I have in mind, somebody is able to tell me the name of the design pattern (if there is one that matches what I am going to describe).

Basically, what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Now I could solve this using RabbitMQ and its consistent hashing exchange, but then I have a data race when new workers enter the cluster, because RabbitMQ does not support re-locating the jobs that are already in a queue.

MQTT 5 does not support this either: Here this idea is known as "sticky shared subscriptions", but this is not official. It may be part of MQTT 6, or it may not. Who knows.

I have also taken a look at NSQ, NATS, and some other brokers. Most of them don't even support this very specific scenario, and those that do use consistent hashing, which has the previously mentioned data racing problem.

Now, the problem would be gone if the broker would not sort the jobs into queues, once the jobs arrive, but if it would track if a job for a specific user is already being processed: If so, it should delay all other jobs for this user, but all jobs for other users should still process. This is, AFAICS, not possible using RabbitMQ et al.

I am pretty sure that I am not the only person with a use case for that. I could e.g. think of users uploading videos to a video platform, and although uploaded videos are processed in parallel, all the videos uploaded by a single user are processed sequentially.

So, to cut a long story short: Is what I describe known under a common name? Something such as distributed job queue? Task dispatcher with task affinity? Or anything else? I have tried lots of terms, but didn't succeed. This may mean that there is no solution for this, but as said, it's hard to imagine that I'm the only person on the planet with this problem.

Any ideas what I could look for? And: Are there any tools that implement this? Any protocols?

PS: Just using a predefined routing key is not an option, since the user ids (which I just used as a made-up example here) are basically UUIDs, so there can be billions of it, so I need something more dynamic. Hence, consistent hashing is basically the correct approach, but as said, the distribution has to work piece by piece, not upfront, to avoid data races.


Solution

  • what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

    However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

    Evenif it was not this particular use case, I did a survey of (dynamic) task scheduling [0] [1] a couple month ago and nothing like that surfaced.

    Every scheduling algorithm I read about have some properties that are common to all other tasks like priority, age, enqueue time, task name (and by extension average time to process). If you tasks were all linked to a user you could build a scheduler that takes user_id into account to pick task from the queue.

    But I guess, you don't want to build your own scheduler, anyway it would be waste because, from experience with such need, existing message queues allow to implement your requirement.

    To summarize your requirements you need:

    A scheduler that run only one task per user at the same time.

    The solution is to use a distributed lock, something like REDIS distlock and acquire the lock before the task starts and refresh it regularly during the task execution. If a new task for the same user comes in and try to execute it will fail to acquire the lock and will be re-enqueued.

    Here is a pseudo-code:

    def my_task(user_id, *args, **kwargs):
        if app.distlock(user_id, blocking=False):
            exec_my_task(user_id, *args, **kwargs)
        else:
            raise RetryTask()
    

    Don't forget to refresh and release the lock.

    A similar approach is taken to enforce robots.txt delay between every requests in crawlers.